You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:22:41 UTC
[2/4] Native protocol v3
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d79bd5b..b9ccd1a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -185,23 +185,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
ConsistencyLevel cl = options.getConsistency();
- List<ByteBuffer> variables = options.getValues();
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
cl.validateForRead(keyspace());
- int limit = getLimit(variables);
+ int limit = getLimit(options);
int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
Pageable command;
if (isKeyRange || usesSecondaryIndexing)
{
- command = getRangeCommand(variables, limitForQuery, now);
+ command = getRangeCommand(options, limitForQuery, now);
}
else
{
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+ List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
command = commands == null ? null : new Pageable.ReadCommands(commands);
}
@@ -214,13 +213,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
{
- return execute(command, cl, variables, limit, now);
+ return execute(command, options, limit, now);
}
else
{
QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
if (parameters.isCount)
- return pageCountQuery(pager, variables, pageSize, now, limit);
+ return pageCountQuery(pager, options, pageSize, now, limit);
// We can't properly do post-query ordering if we page (see #6722)
if (needsPostQueryOrdering())
@@ -228,14 +227,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
+ "ORDER BY or the IN and sort client side, or disable paging for this query");
List<Row> page = pager.fetchPage(pageSize);
- ResultMessage.Rows msg = processResults(page, variables, limit, now);
+ ResultMessage.Rows msg = processResults(page, options, limit, now);
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
}
- private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
{
List<Row> rows;
if (command == null)
@@ -245,21 +244,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else
{
rows = command instanceof Pageable.ReadCommands
- ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
- : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+ ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency())
+ : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
}
- return processResults(rows, variables, limit, now);
+ return processResults(rows, options, limit, now);
}
- private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows pageCountQuery(QueryPager pager, QueryOptions options, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
{
int count = 0;
while (!pager.isExhausted())
{
int maxLimit = pager.maxRemaining();
logger.debug("New maxLimit for paged count query is {}", maxLimit);
- ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now);
+ ResultSet rset = process(pager.fetchPage(pageSize), options, maxLimit, now);
count += rset.rows.size();
}
@@ -269,10 +268,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return new ResultMessage.Rows(result);
}
- public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some column together in sparse column families
- ResultSet rset = process(rows, variables, limit, now);
+ ResultSet rset = process(rows, options, limit, now);
rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
return new ResultMessage.Rows(rset);
}
@@ -288,29 +287,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = Collections.emptyList();
- int limit = getLimit(variables);
+ QueryOptions options = QueryOptions.DEFAULT;
+ int limit = getLimit(options);
int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
List<Row> rows;
if (isKeyRange || usesSecondaryIndexing)
{
- RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now);
+ RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
}
else
{
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+ List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
}
- return processResults(rows, variables, limit, now);
+ return processResults(rows, options, limit, now);
}
public ResultSet process(List<Row> rows) throws InvalidRequestException
{
assert !parameters.isCount; // not yet needed
- return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
+ QueryOptions options = QueryOptions.DEFAULT;
+ return process(rows, options, getLimit(options), System.currentTimeMillis());
}
public String keyspace()
@@ -323,15 +323,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return cfm.cfName;
}
- private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
{
- Collection<ByteBuffer> keys = getKeys(variables);
+ Collection<ByteBuffer> keys = getKeys(options);
if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
return null;
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
- IDiskAtomFilter filter = makeFilter(variables, limit);
+ IDiskAtomFilter filter = makeFilter(options, limit);
if (filter == null)
return null;
@@ -349,29 +349,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return commands;
}
- private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
{
- IDiskAtomFilter filter = makeFilter(variables, limit);
+ IDiskAtomFilter filter = makeFilter(options, limit);
if (filter == null)
return null;
- List<IndexExpression> expressions = getIndexExpressions(variables);
+ List<IndexExpression> expressions = getIndexExpressions(options);
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
- AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+ AbstractBounds<RowPosition> keyBounds = getKeyBounds(options);
return keyBounds == null
? null
: new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
}
- private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
+ private AbstractBounds<RowPosition> getKeyBounds(QueryOptions options) throws InvalidRequestException
{
IPartitioner<?> p = StorageService.getPartitioner();
if (onToken)
{
- Token startToken = getTokenBound(Bound.START, variables, p);
- Token endToken = getTokenBound(Bound.END, variables, p);
+ Token startToken = getTokenBound(Bound.START, options, p);
+ Token endToken = getTokenBound(Bound.END, options, p);
boolean includeStart = includeKeyBound(Bound.START);
boolean includeEnd = includeKeyBound(Bound.END);
@@ -397,8 +397,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
- ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
+ ByteBuffer startKeyBytes = getKeyBound(Bound.START, options);
+ ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options);
RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
@@ -421,7 +421,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
- private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
+ private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
throws InvalidRequestException
{
if (parameters.isDistinct)
@@ -431,8 +431,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else if (isColumnRange())
{
int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
- List<Composite> startBounds = getRequestedBound(Bound.START, variables);
- List<Composite> endBounds = getRequestedBound(Bound.END, variables);
+ List<Composite> startBounds = getRequestedBound(Bound.START, options);
+ List<Composite> endBounds = getRequestedBound(Bound.END, options);
assert startBounds.size() == endBounds.size();
// Handles fetching static columns. Note that for 2i, the filter is just used to restrict
@@ -516,7 +516,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- SortedSet<CellName> cellNames = getRequestedColumns(variables);
+ SortedSet<CellName> cellNames = getRequestedColumns(options);
if (cellNames == null) // in case of IN () for the last column of the key
return null;
QueryProcessor.validateCellNames(cellNames, cfm.comparator);
@@ -534,12 +534,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return new SliceQueryFilter(slices, isReversed, limit, toGroup);
}
- private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
+ private int getLimit(QueryOptions options) throws InvalidRequestException
{
int l = Integer.MAX_VALUE;
if (limit != null)
{
- ByteBuffer b = limit.bindAndGet(variables);
+ ByteBuffer b = limit.bindAndGet(options);
if (b == null)
throw new InvalidRequestException("Invalid null value of limit");
@@ -569,7 +569,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
: limit;
}
- private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
+ private Collection<ByteBuffer> getKeys(final QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
CBuilder builder = cfm.getKeyValidatorAsCType().builder();
@@ -578,7 +578,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Restriction r = keyRestrictions[def.position()];
assert r != null && !r.isSlice();
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (builder.remainingCount() == 1)
{
@@ -603,7 +603,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return keys;
}
- private ByteBuffer getKeyBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
{
// Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first
// component of a composite partition key).
@@ -612,10 +612,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
// We deal with IN queries for keys in other places, so we know buildBound will return only one result
- return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
+ return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer();
}
- private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
+ private Token getTokenBound(Bound b, QueryOptions options, IPartitioner<?> p) throws InvalidRequestException
{
assert onToken;
@@ -623,7 +623,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ByteBuffer value;
if (keyRestriction.isEQ())
{
- value = keyRestriction.values(variables).get(0);
+ value = keyRestriction.values(options).get(0);
}
else
{
@@ -631,7 +631,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (!slice.hasBound(b))
return p.getMinimumToken();
- value = slice.bound(b, variables);
+ value = slice.bound(b, options);
}
if (value == null)
@@ -669,7 +669,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return false;
}
- private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+ private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
{
// Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
// we always do a slice for CQL3 tables, so it's ok to ignore them here
@@ -682,7 +682,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ColumnDefinition def = idIter.next();
assert r != null && !r.isSlice();
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (values.size() == 1)
{
ByteBuffer val = values.get(0);
@@ -772,7 +772,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Restriction[] restrictions,
boolean isReversed,
CType type,
- List<ByteBuffer> variables) throws InvalidRequestException
+ QueryOptions options) throws InvalidRequestException
{
CBuilder builder = type.builder();
@@ -801,7 +801,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (r.isSlice())
{
- builder.add(getSliceValue(def, r, b, variables));
+ builder.add(getSliceValue(def, r, b, options));
Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b);
// We can have more non null restriction if the "scalar" notation was used for the bound (#4851).
@@ -813,13 +813,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (isNullRestriction(r, b))
break;
- builder.add(getSliceValue(def, r, b, variables));
+ builder.add(getSliceValue(def, r, b, options));
}
return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
}
else
{
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (values.size() != 1)
{
// IN query, we only support it on the clustering column
@@ -878,23 +878,23 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
}
- private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
{
Restriction.Slice slice = (Restriction.Slice)r;
assert slice.hasBound(b);
- ByteBuffer val = slice.bound(b, variables);
+ ByteBuffer val = slice.bound(b, options);
if (val == null)
throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
return val;
}
- private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
{
assert isColumnRange();
- return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
+ return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
}
- public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<IndexExpression> getIndexExpressions(QueryOptions options) throws InvalidRequestException
{
if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
return Collections.emptyList();
@@ -927,7 +927,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
if (slice.hasBound(b))
{
- ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
+ ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
}
}
@@ -935,12 +935,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else if (restriction.isContains())
{
Restriction.Contains contains = (Restriction.Contains)restriction;
- for (ByteBuffer value : contains.values(variables))
+ for (ByteBuffer value : contains.values(options))
{
validateIndexedValue(def, value);
expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
}
- for (ByteBuffer key : contains.keys(variables))
+ for (ByteBuffer key : contains.keys(options))
{
validateIndexedValue(def, key);
expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
@@ -948,7 +948,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- List<ByteBuffer> values = restriction.values(variables);
+ List<ByteBuffer> values = restriction.values(options);
if (values.size() != 1)
throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
@@ -969,13 +969,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return value;
}
- private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final List<ByteBuffer> variables) throws InvalidRequestException
+ private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
{
assert sliceRestriction != null;
final CellNameType type = cfm.comparator;
- final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
- final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
+ final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, options));
+ final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, options));
return new AbstractIterator<Cell>()
{
@@ -998,7 +998,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
};
}
- private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
+ private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
{
Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
for (org.apache.cassandra.db.Row row : rows)
@@ -1007,12 +1007,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (row.cf == null)
continue;
- processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
+ processColumnFamily(row.key.getKey(), row.cf, options, now, result);
}
ResultSet cqlRows = result.build();
- orderResults(cqlRows, variables);
+ orderResults(cqlRows);
// Internal calls always return columns in the comparator order, even when reverse was set
if (isReversed)
@@ -1024,7 +1024,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
// Used by ModificationStatement for CAS operations
- void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
+ void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
throws InvalidRequestException
{
CFMetaData cfm = cf.metadata();
@@ -1040,7 +1040,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Iterator<Cell> cells = cf.getSortedColumns().iterator();
if (sliceRestriction != null)
- cells = applySliceRestriction(cells, variables);
+ cells = applySliceRestriction(cells, options);
CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
@@ -1059,7 +1059,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
result.add(keyComponents[def.position()]);
break;
case STATIC:
- addValue(result, def, staticRow);
+ addValue(result, def, staticRow, options);
break;
default:
result.add((ByteBuffer)null);
@@ -1089,17 +1089,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
result.add(cql3Row.getColumn(null));
break;
case REGULAR:
- addValue(result, def, cql3Row);
+ addValue(result, def, cql3Row, options);
break;
case STATIC:
- addValue(result, def, staticRow);
+ addValue(result, def, staticRow, options);
break;
}
}
}
}
- private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row)
+ private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
{
if (row == null)
{
@@ -1112,7 +1112,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
List<Cell> collection = row.getCollection(def.name);
ByteBuffer value = collection == null
? null
- : ((CollectionType)def.type).serialize(collection);
+ : ((CollectionType)def.type).serializeForNativeProtocol(collection, options.getProtocolVersion());
result.add(value);
return;
}
@@ -1137,7 +1137,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
/**
* Orders results when multiple keys are selected (using IN)
*/
- private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
+ private void orderResults(ResultSet cqlRows) throws InvalidRequestException
{
if (cqlRows.size() == 0 || !needsPostQueryOrdering())
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 6f9a270..3902e05 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -362,7 +362,7 @@ public class DefsTables
dropType(type);
for (MapDifference.ValueDifference<UserType> tdiff : typesDiff.entriesDiffering().values())
- addType(tdiff.rightValue()); // use the most recent value
+ updateType(tdiff.rightValue()); // use the most recent value
}
}
}
@@ -412,7 +412,7 @@ public class DefsTables
ksm.userTypes.addType(ut);
if (!StorageService.instance.isClientMode())
- MigrationManager.instance.notifyUpdateKeyspace(ksm);
+ MigrationManager.instance.notifyCreateUserType(ut);
}
private static void updateKeyspace(KSMetaData newState)
@@ -444,6 +444,19 @@ public class DefsTables
}
}
+ private static void updateType(UserType ut)
+ {
+ KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Updating {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ if (!StorageService.instance.isClientMode())
+ MigrationManager.instance.notifyUpdateUserType(ut);
+ }
+
private static void dropKeyspace(String ksName)
{
KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -515,7 +528,7 @@ public class DefsTables
ksm.userTypes.removeType(ut);
if (!StorageService.instance.isClientMode())
- MigrationManager.instance.notifyUpdateKeyspace(ksm);
+ MigrationManager.instance.notifyUpdateUserType(ut);
}
private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 5db4ba0..7f75a5f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -57,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
protected abstract void appendToStringBuilder(StringBuilder sb);
- public abstract ByteBuffer serialize(List<Cell> cells);
+ public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
@Override
public String toString()
@@ -110,22 +111,9 @@ public abstract class CollectionType<T> extends AbstractType<T>
return true;
}
- // Utilitary method
- protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+ protected List<Cell> enforceLimit(List<Cell> cells, int version)
{
- ByteBuffer result = ByteBuffer.allocate(2 + size);
- result.putShort((short)elements);
- for (ByteBuffer bb : buffers)
- {
- result.putShort((short)bb.remaining());
- result.put(bb.duplicate());
- }
- return (ByteBuffer)result.flip();
- }
-
- protected List<Cell> enforceLimit(List<Cell> cells)
- {
- if (cells.size() <= MAX_ELEMENTS)
+ if (version >= 3 || cells.size() <= MAX_ELEMENTS)
return cells;
logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. "
@@ -133,12 +121,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
return cells.subList(0, MAX_ELEMENTS);
}
- public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+ public ByteBuffer serializeForNativeProtocol(List<Cell> cells, int version)
{
- int size = 0;
- for (ByteBuffer bb : buffers)
- size += 2 + bb.remaining();
- return pack(buffers, elements, size);
+ cells = enforceLimit(cells, version);
+ List<ByteBuffer> values = serializedValues(cells);
+ return CollectionSerializer.pack(values, cells.size(), version);
}
public CQL3Type asCQL3Type()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 43ace65..6e6821b 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -72,7 +72,7 @@ public class ListType<T> extends CollectionType<List<T>>
return elements;
}
- public TypeSerializer<List<T>> getSerializer()
+ public ListSerializer<T> getSerializer()
{
return serializer;
}
@@ -112,17 +112,11 @@ public class ListType<T> extends CollectionType<List<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- int size = 0;
for (Cell c : cells)
- {
bbs.add(c.value());
- size += 2 + c.value().remaining();
- }
- return pack(bbs, cells.size(), size);
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 213e213..71023a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -108,7 +108,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
}
@Override
- public TypeSerializer<Map<K, V>> getSerializer()
+ public MapSerializer<K, V> getSerializer()
{
return serializer;
}
@@ -123,23 +123,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values)));
}
- /**
- * Creates the same output than serialize, but from the internal representation.
- */
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * cells.size());
- int size = 0;
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
for (Cell c : cells)
{
- ByteBuffer key = c.name().collectionElement();
- ByteBuffer value = c.value();
- bbs.add(key);
- bbs.add(value);
- size += 4 + key.remaining() + value.remaining();
+ bbs.add(c.name().collectionElement());
+ bbs.add(c.value());
}
- return pack(bbs, cells.size(), size);
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 3b686b8..d2f7f12 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -77,7 +77,7 @@ public class SetType<T> extends CollectionType<Set<T>>
return ListType.compareListOrSet(elements, o1, o2);
}
- public TypeSerializer<Set<T>> getSerializer()
+ public SetSerializer<T> getSerializer()
{
return serializer;
}
@@ -92,18 +92,11 @@ public class SetType<T> extends CollectionType<Set<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- int size = 0;
for (Cell c : cells)
- {
- ByteBuffer key = c.name().collectionElement();
- bbs.add(key);
- size += 2 + key.remaining();
- }
- return pack(bbs, cells.size(), size);
+ bbs.add(c.name().collectionElement());
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index eb95fb9..973a5be 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -64,6 +64,11 @@ public class UserType extends CompositeType
return new UserType(keyspace, name, columnNames, columnTypes);
}
+ public String getNameAsString()
+ {
+ return UTF8Type.instance.compose(name);
+ }
+
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index af88853..9e3abcf 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -431,8 +432,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
{
ByteBuffer buffer = objToBB(sub);
serialized.add(buffer);
- }
- return CollectionType.pack(serialized, objects.size());
+ }
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToMapBB(List<Object> objects)
@@ -447,7 +450,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
serialized.add(buffer);
}
}
- return CollectionType.pack(serialized, objects.size());
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToCompositeBB(List<Object> objects)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 9b7a8e7..6993b19 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -195,14 +195,15 @@ public class CQLSSTableWriter
if (values.size() != boundNames.size())
throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
- List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
- Composite clusteringPrefix = insert.createClusteringPrefix(values);
+ QueryOptions options = QueryOptions.forInternalCalls(null, values);
+ List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+ Composite clusteringPrefix = insert.createClusteringPrefix(options);
long now = System.currentTimeMillis() * 1000;
UpdateParameters params = new UpdateParameters(insert.cfm,
- values,
- insert.getTimestamp(now, values),
- insert.getTimeToLive(values),
+ options,
+ insert.getTimestamp(now, options),
+ insert.getTimeToLive(options),
Collections.<ByteBuffer, CQL3Row>emptyMap());
for (ByteBuffer key: keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 83a391d..0e16fda 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.serializers;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
public abstract class CollectionSerializer<T> implements TypeSerializer<T>
{
public void validate(ByteBuffer bytes) throws MarshalException
@@ -28,24 +30,104 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
// The collection is not currently being properly validated.
}
- // Utilitary method
- protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+ protected abstract List<ByteBuffer> serializeValues(T value);
+ protected abstract int getElementCount(T value);
+
+ public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version);
+
+ public ByteBuffer serialize(T value)
+ {
+ List<ByteBuffer> values = serializeValues(value);
+ // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+ // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+ return pack(values, getElementCount(value), 3);
+ }
+
+ public T deserialize(ByteBuffer bytes)
{
- ByteBuffer result = ByteBuffer.allocate(2 + size);
- result.putShort((short)elements);
+ // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+ // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+ return deserializeForNativeProtocol(bytes, 3);
+ }
+
+ public static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int version)
+ {
+ int size = 0;
+ for (ByteBuffer bb : buffers)
+ size += sizeOfValue(bb, version);
+
+ ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, version) + size);
+ writeCollectionSize(result, elements, version);
for (ByteBuffer bb : buffers)
+ writeValue(result, bb, version);
+ return (ByteBuffer)result.flip();
+ }
+
+ protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
+ {
+ if (version >= 3)
+ output.putInt(elements);
+ else
+ output.putShort((short)elements);
+ }
+
+ protected static int readCollectionSize(ByteBuffer input, int version)
+ {
+ return version >= 3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+ }
+
+ protected static int sizeOfCollectionSize(int elements, int version)
+ {
+ return version >= 3 ? 4 : 2;
+ }
+
+ protected static void writeValue(ByteBuffer output, ByteBuffer value, int version)
+ {
+ if (version >= 3)
{
- result.putShort((short)bb.remaining());
- result.put(bb.duplicate());
+ if (value == null)
+ {
+ output.putInt(-1);
+ return;
+ }
+
+ output.putInt(value.remaining());
+ output.put(value.duplicate());
+ }
+ else
+ {
+ assert value != null;
+ output.putShort((short)value.remaining());
+ output.put(value.duplicate());
}
- return (ByteBuffer)result.flip();
}
- public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+ protected static ByteBuffer readValue(ByteBuffer input, int version)
{
- int size = 0;
- for (ByteBuffer bb : buffers)
- size += 2 + bb.remaining();
- return pack(buffers, elements, size);
+ if (version >= 3)
+ {
+ int size = input.getInt();
+ if (size < 0)
+ return null;
+
+ return ByteBufferUtil.readBytes(input, size);
+ }
+ else
+ {
+ return ByteBufferUtil.readBytesWithShortLength(input);
+ }
+ }
+
+ protected static int sizeOfValue(ByteBuffer value, int version)
+ {
+ if (version >= 3)
+ {
+ return value == null ? 4 : 4 + value.remaining();
+ }
+ else
+ {
+ assert value != null;
+ return 2 + value.remaining();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index 59f25d2..e662341 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -47,16 +47,29 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
this.elements = elements;
}
- public List<T> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(List<T> values)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(values.size());
+ for (T value : values)
+ buffers.add(elements.serialize(value));
+ return buffers;
+ }
+
+ public int getElementCount(List<T> value)
+ {
+ return value.size();
+ }
+
+ public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
List<T> l = new ArrayList<T>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer databb = readValue(input, version);
elements.validate(databb);
l.add(elements.deserialize(databb));
}
@@ -68,26 +81,6 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
}
}
- /**
- * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
- * where:
- * n is the number of elements
- * s_i is the number of bytes composing the ith element
- * b_i is the s_i bytes composing the ith element
- */
- public ByteBuffer serialize(List<T> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
- int size = 0;
- for (T elt : value)
- {
- ByteBuffer bb = elements.serialize(elt);
- bbs.add(bb);
- size += 2 + bb.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(List<T> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index f79d07f..5d349dd 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -51,19 +51,35 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
this.values = values;
}
- public Map<K, V> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(Map<K, V> map)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(map.size() * 2);
+ for (Map.Entry<K, V> entry : map.entrySet())
+ {
+ buffers.add(keys.serialize(entry.getKey()));
+ buffers.add(values.serialize(entry.getValue()));
+ }
+ return buffers;
+ }
+
+ public int getElementCount(Map<K, V> value)
+ {
+ return value.size();
+ }
+
+ public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
Map<K, V> m = new LinkedHashMap<K, V>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer kbb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer kbb = readValue(input, version);
keys.validate(kbb);
- ByteBuffer vbb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer vbb = readValue(input, version);
values.validate(vbb);
m.put(keys.deserialize(kbb), values.deserialize(vbb));
@@ -76,30 +92,6 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
}
}
- /**
- * Layout is: {@code <n><sk_1><k_1><sv_1><v_1>...<sk_n><k_n><sv_n><v_n> }
- * where:
- * n is the number of elements
- * sk_i is the number of bytes composing the ith key k_i
- * k_i is the sk_i bytes composing the ith key
- * sv_i is the number of bytes composing the ith value v_i
- * v_i is the sv_i bytes composing the ith value
- */
- public ByteBuffer serialize(Map<K, V> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * value.size());
- int size = 0;
- for (Map.Entry<K, V> entry : value.entrySet())
- {
- ByteBuffer bbk = keys.serialize(entry.getKey());
- ByteBuffer bbv = values.serialize(entry.getValue());
- bbs.add(bbk);
- bbs.add(bbv);
- size += 4 + bbk.remaining() + bbv.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(Map<K, V> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index d6d7062..812dd68 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -47,16 +47,29 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
this.elements = elements;
}
- public Set<T> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(Set<T> values)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(values.size());
+ for (T value : values)
+ buffers.add(elements.serialize(value));
+ return buffers;
+ }
+
+ public int getElementCount(Set<T> value)
+ {
+ return value.size();
+ }
+
+ public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
Set<T> l = new LinkedHashSet<T>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer databb = readValue(input, version);
elements.validate(databb);
l.add(elements.deserialize(databb));
}
@@ -68,26 +81,6 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
}
}
- /**
- * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
- * where:
- * n is the number of elements
- * s_i is the number of bytes composing the ith element
- * b_i is the s_i bytes composing the ith element
- */
- public ByteBuffer serialize(Set<T> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
- int size = 0;
- for (T elt : value)
- {
- ByteBuffer bb = elements.serialize(elt);
- bbs.add(bb);
- size += 2 + bb.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(Set<T> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index e16ac62..4d142bd 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -21,10 +21,13 @@ public interface IMigrationListener
{
public void onCreateKeyspace(String ksName);
public void onCreateColumnFamily(String ksName, String cfName);
+ public void onCreateUserType(String ksName, String typeName);
public void onUpdateKeyspace(String ksName);
public void onUpdateColumnFamily(String ksName, String cfName);
+ public void onUpdateUserType(String ksName, String typeName);
public void onDropKeyspace(String ksName);
public void onDropColumnFamily(String ksName, String cfName);
+ public void onDropUserType(String ksName, String typeName);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 7eb7282..ec46d3f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -167,6 +167,12 @@ public class MigrationManager
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyCreateUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public void notifyUpdateKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
@@ -179,6 +185,12 @@ public class MigrationManager
listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyUpdateUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public void notifyDropKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
@@ -191,6 +203,12 @@ public class MigrationManager
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyDropUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onDropUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
{
announceNewKeyspace(ksm, FBUtilities.timestampMicros());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index c4abe0b..3040aaf 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1969,7 +1969,7 @@ public class CassandraServer implements Cassandra.Iface
}
ThriftClientState cState = state();
- return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
+ return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
}
catch (RequestExecutionException e)
{
@@ -2100,7 +2100,7 @@ public class CassandraServer implements Cassandra.Iface
return cState.getCQLQueryHandler().processPrepared(statement,
cState.getQueryState(),
- new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
+ QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
}
catch (RequestExecutionException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5222a1..36a7e71 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -363,6 +364,22 @@ public abstract class CBUtil
return size;
}
+ public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb)
+ {
+ int size = cb.readUnsignedShort();
+ if (size == 0)
+ return Pair.create(Collections.<String>emptyList(), Collections.<ByteBuffer>emptyList());
+
+ List<String> s = new ArrayList<>(size);
+ List<ByteBuffer> l = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ s.add(readString(cb));
+ l.add(readValue(cb));
+ }
+ return Pair.create(s, l);
+ }
+
public static InetSocketAddress readInet(ByteBuf cb)
{
int addrSize = cb.readByte();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4a50bde..989b954 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -128,7 +128,7 @@ public class Client extends SimpleClient
return null;
}
}
- return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
+ return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
}
else if (msgType.equals("PREPARE"))
{
@@ -156,7 +156,7 @@ public class Client extends SimpleClient
}
values.add(bb);
}
- return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values));
+ return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index f0b5d95..3cff973 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.transport;
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -51,7 +51,9 @@ public enum DataType implements OptionCodec.Codecable<DataType>
INET (16, InetAddressType.instance),
LIST (32, null),
MAP (33, null),
- SET (34, null);
+ SET (34, null),
+ UDT (48, null);
+
public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
@@ -78,27 +80,39 @@ public enum DataType implements OptionCodec.Codecable<DataType>
return id;
}
- public Object readValue(ByteBuf cb)
+ public Object readValue(ByteBuf cb, int version)
{
switch (this)
{
case CUSTOM:
return CBUtil.readString(cb);
case LIST:
- return DataType.toType(codec.decodeOne(cb));
+ return DataType.toType(codec.decodeOne(cb, version));
case SET:
- return DataType.toType(codec.decodeOne(cb));
+ return DataType.toType(codec.decodeOne(cb, version));
case MAP:
List<AbstractType> l = new ArrayList<AbstractType>(2);
- l.add(DataType.toType(codec.decodeOne(cb)));
- l.add(DataType.toType(codec.decodeOne(cb)));
+ l.add(DataType.toType(codec.decodeOne(cb, version)));
+ l.add(DataType.toType(codec.decodeOne(cb, version)));
return l;
+ case UDT:
+ String ks = CBUtil.readString(cb);
+ ByteBuffer name = UTF8Type.instance.decompose(CBUtil.readString(cb));
+ int n = cb.readUnsignedShort();
+ List<ByteBuffer> fieldNames = new ArrayList<>(n);
+ List<AbstractType<?>> fieldTypes = new ArrayList<>(n);
+ for (int i = 0; i < n; i++)
+ {
+ fieldNames.add(UTF8Type.instance.decompose(CBUtil.readString(cb)));
+ fieldTypes.add(DataType.toType(codec.decodeOne(cb, version)));
+ }
+ return new UserType(ks, name, fieldNames, fieldTypes);
default:
return null;
}
}
- public void writeValue(Object value, ByteBuf cb)
+ public void writeValue(Object value, ByteBuf cb, int version)
{
switch (this)
{
@@ -107,40 +121,63 @@ public enum DataType implements OptionCodec.Codecable<DataType>
CBUtil.writeString((String)value, cb);
break;
case LIST:
- codec.writeOne(DataType.fromType((AbstractType)value), cb);
+ codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
break;
case SET:
- codec.writeOne(DataType.fromType((AbstractType)value), cb);
+ codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
break;
case MAP:
List<AbstractType> l = (List<AbstractType>)value;
- codec.writeOne(DataType.fromType(l.get(0)), cb);
- codec.writeOne(DataType.fromType(l.get(1)), cb);
+ codec.writeOne(DataType.fromType(l.get(0), version), cb, version);
+ codec.writeOne(DataType.fromType(l.get(1), version), cb, version);
+ break;
+ case UDT:
+ UserType udt = (UserType)value;
+ CBUtil.writeString(udt.keyspace, cb);
+ CBUtil.writeString(UTF8Type.instance.compose(udt.name), cb);
+ cb.writeShort(udt.columnNames.size());
+ for (int i = 0; i < udt.columnNames.size(); i++)
+ {
+ CBUtil.writeString(UTF8Type.instance.compose(udt.columnNames.get(i)), cb);
+ codec.writeOne(DataType.fromType(udt.types.get(i), version), cb, version);
+ }
break;
}
}
- public int serializedValueSize(Object value)
+ public int serializedValueSize(Object value, int version)
{
switch (this)
{
case CUSTOM:
- return 2 + ((String)value).getBytes(StandardCharsets.UTF_8).length;
+ return CBUtil.sizeOfString((String)value);
case LIST:
case SET:
- return codec.oneSerializedSize(DataType.fromType((AbstractType)value));
+ return codec.oneSerializedSize(DataType.fromType((AbstractType)value, version), version);
case MAP:
List<AbstractType> l = (List<AbstractType>)value;
int s = 0;
- s += codec.oneSerializedSize(DataType.fromType(l.get(0)));
- s += codec.oneSerializedSize(DataType.fromType(l.get(1)));
+ s += codec.oneSerializedSize(DataType.fromType(l.get(0), version), version);
+ s += codec.oneSerializedSize(DataType.fromType(l.get(1), version), version);
return s;
+ case UDT:
+ UserType udt = (UserType)value;
+ int size = 0;
+ size += CBUtil.sizeOfString(udt.keyspace);
+ size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.name));
+ size += 2;
+ for (int i = 0; i < udt.columnNames.size(); i++)
+ {
+ size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.columnNames.get(i)));
+ size += codec.oneSerializedSize(DataType.fromType(udt.types.get(i), version), version);
+ }
+ return size;
default:
return 0;
}
}
- public static Pair<DataType, Object> fromType(AbstractType type)
+ public static Pair<DataType, Object> fromType(AbstractType type, int version)
{
// For CQL3 clients, ReversedType is an implementation detail and they
// shouldn't have to care about it.
@@ -170,6 +207,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
return Pair.<DataType, Object>create(SET, ((SetType)type).elements);
}
}
+
+ if (type instanceof UserType && version >= 3)
+ return Pair.<DataType, Object>create(UDT, type);
+
return Pair.<DataType, Object>create(CUSTOM, type.toString());
}
else
@@ -193,6 +234,8 @@ public enum DataType implements OptionCodec.Codecable<DataType>
case MAP:
List<AbstractType> l = (List<AbstractType>)entry.right;
return MapType.getInstance(l.get(0), l.get(1));
+ case UDT:
+ return (AbstractType)entry.right;
default:
return entry.left.type;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 242ad64..7ec026e 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
public abstract class Event
@@ -33,33 +34,33 @@ public abstract class Event
this.type = type;
}
- public static Event deserialize(ByteBuf cb)
+ public static Event deserialize(ByteBuf cb, int version)
{
switch (CBUtil.readEnumValue(Type.class, cb))
{
case TOPOLOGY_CHANGE:
- return TopologyChange.deserializeEvent(cb);
+ return TopologyChange.deserializeEvent(cb, version);
case STATUS_CHANGE:
- return StatusChange.deserializeEvent(cb);
+ return StatusChange.deserializeEvent(cb, version);
case SCHEMA_CHANGE:
- return SchemaChange.deserializeEvent(cb);
+ return SchemaChange.deserializeEvent(cb, version);
}
throw new AssertionError();
}
- public void serialize(ByteBuf dest)
+ public void serialize(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(type, dest);
- serializeEvent(dest);
+ serializeEvent(dest, version);
}
- public int serializedSize()
+ public int serializedSize(int version)
{
- return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+ return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version);
}
- protected abstract void serializeEvent(ByteBuf dest);
- protected abstract int eventSerializedSize();
+ protected abstract void serializeEvent(ByteBuf dest, int version);
+ protected abstract int eventSerializedSize(int version);
public static class TopologyChange extends Event
{
@@ -91,20 +92,20 @@ public abstract class Event
}
// Assumes the type has already been deserialized
- private static TopologyChange deserializeEvent(ByteBuf cb)
+ private static TopologyChange deserializeEvent(ByteBuf cb, int version)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
InetSocketAddress node = CBUtil.readInet(cb);
return new TopologyChange(change, node);
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(change, dest);
CBUtil.writeInet(node, dest);
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
}
@@ -114,6 +115,23 @@ public abstract class Event
{
return change + " " + node;
}
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(change, node);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof TopologyChange))
+ return false;
+
+ TopologyChange tpc = (TopologyChange)other;
+ return Objects.equal(change, tpc.change)
+ && Objects.equal(node, tpc.node);
+ }
}
public static class StatusChange extends Event
@@ -141,20 +159,20 @@ public abstract class Event
}
// Assumes the type has already been deserialized
- private static StatusChange deserializeEvent(ByteBuf cb)
+ private static StatusChange deserializeEvent(ByteBuf cb, int version)
{
Status status = CBUtil.readEnumValue(Status.class, cb);
InetSocketAddress node = CBUtil.readInet(cb);
return new StatusChange(status, node);
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(status, dest);
CBUtil.writeInet(node, dest);
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
}
@@ -164,56 +182,130 @@ public abstract class Event
{
return status + " " + node;
}
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(status, node);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof StatusChange))
+ return false;
+
+ StatusChange stc = (StatusChange)other;
+ return Objects.equal(status, stc.status)
+ && Objects.equal(node, stc.node);
+ }
}
public static class SchemaChange extends Event
{
public enum Change { CREATED, UPDATED, DROPPED }
+ public enum Target { KEYSPACE, TABLE, TYPE }
public final Change change;
+ public final Target target;
public final String keyspace;
- public final String table;
+ public final String tableOrType;
- public SchemaChange(Change change, String keyspace, String table)
+ public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
{
super(Type.SCHEMA_CHANGE);
this.change = change;
+ this.target = target;
this.keyspace = keyspace;
- this.table = table;
+ this.tableOrType = tableOrType;
}
public SchemaChange(Change change, String keyspace)
{
- this(change, keyspace, "");
+ this(change, Target.KEYSPACE, keyspace, null);
}
// Assumes the type has already been deserialized
- private static SchemaChange deserializeEvent(ByteBuf cb)
+ private static SchemaChange deserializeEvent(ByteBuf cb, int version)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
- String keyspace = CBUtil.readString(cb);
- String table = CBUtil.readString(cb);
- return new SchemaChange(change, keyspace, table);
+ if (version >= 3)
+ {
+ Target target = CBUtil.readEnumValue(Target.class, cb);
+ String keyspace = CBUtil.readString(cb);
+ String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
+ return new SchemaChange(change, target, keyspace, tableOrType);
+ }
+ else
+ {
+ String keyspace = CBUtil.readString(cb);
+ String table = CBUtil.readString(cb);
+ return new SchemaChange(change, table.isEmpty() ? Target.KEYSPACE : Target.TABLE, keyspace, table.isEmpty() ? null : table);
+ }
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
- CBUtil.writeEnumValue(change, dest);
- CBUtil.writeString(keyspace, dest);
- CBUtil.writeString(table, dest);
+ if (version >= 3)
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeEnumValue(target, dest);
+ CBUtil.writeString(keyspace, dest);
+ if (target != Target.KEYSPACE)
+ CBUtil.writeString(tableOrType, dest);
+ }
+ else
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+ }
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
- return CBUtil.sizeOfEnumValue(change)
- + CBUtil.sizeOfString(keyspace)
- + CBUtil.sizeOfString(table);
+ if (version >= 3)
+ {
+ int size = CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfEnumValue(target)
+ + CBUtil.sizeOfString(keyspace);
+
+ if (target != Target.KEYSPACE)
+ size += CBUtil.sizeOfString(tableOrType);
+
+ return size;
+ }
+ else
+ {
+ return CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+ }
}
@Override
public String toString()
{
- return change + " " + keyspace + (table.isEmpty() ? "" : "." + table);
+ return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(change, target, keyspace, tableOrType);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof SchemaChange))
+ return false;
+
+ SchemaChange scc = (SchemaChange)other;
+ return Objects.equal(change, scc.change)
+ && Objects.equal(target, scc.target)
+ && Objects.equal(keyspace, scc.keyspace)
+ && Objects.equal(tableOrType, scc.tableOrType);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 9b82bda..ec2a1fa 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -32,9 +32,9 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
{
public int getId();
- public Object readValue(ByteBuf cb);
- public void writeValue(Object value, ByteBuf cb);
- public int serializedValueSize(Object obj);
+ public Object readValue(ByteBuf cb, int version);
+ public void writeValue(Object value, ByteBuf cb, int version);
+ public int serializedValueSize(Object obj, int version);
}
private final Class<T> klass;
@@ -66,14 +66,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
return opt;
}
- public Map<T, Object> decode(ByteBuf body)
+ public Map<T, Object> decode(ByteBuf body, int version)
{
EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
int n = body.readUnsignedShort();
for (int i = 0; i < n; i++)
{
T opt = fromId(body.readUnsignedShort());
- Object value = opt.readValue(body);
+ Object value = opt.readValue(body, version);
if (options.containsKey(opt))
throw new ProtocolException(String.format("Duplicate option %s in message", opt.name()));
options.put(opt, value);
@@ -81,41 +81,41 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
return options;
}
- public ByteBuf encode(Map<T, Object> options)
+ public ByteBuf encode(Map<T, Object> options, int version)
{
int optLength = 2;
for (Map.Entry<T, Object> entry : options.entrySet())
- optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
+ optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version);
ByteBuf cb = Unpooled.buffer(optLength);
cb.writeShort(options.size());
for (Map.Entry<T, Object> entry : options.entrySet())
{
T opt = entry.getKey();
cb.writeShort(opt.getId());
- opt.writeValue(entry.getValue(), cb);
+ opt.writeValue(entry.getValue(), cb, version);
}
return cb;
}
- public Pair<T, Object> decodeOne(ByteBuf body)
+ public Pair<T, Object> decodeOne(ByteBuf body, int version)
{
T opt = fromId(body.readUnsignedShort());
- Object value = opt.readValue(body);
+ Object value = opt.readValue(body, version);
return Pair.create(opt, value);
}
- public void writeOne(Pair<T, Object> option, ByteBuf dest)
+ public void writeOne(Pair<T, Object> option, ByteBuf dest, int version)
{
T opt = option.left;
Object obj = option.right;
dest.writeShort(opt.getId());
- opt.writeValue(obj, dest);
+ opt.writeValue(obj, dest, version);
}
- public int oneSerializedSize(Pair<T, Object> option)
+ public int oneSerializedSize(Pair<T, Object> option, int version)
{
T opt = option.left;
Object obj = option.right;
- return 2 + opt.serializedValueSize(obj);
+ return 2 + opt.serializedValueSize(obj, version);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8d08ffd..eb2b043 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -378,7 +378,12 @@ public class Server implements CassandraDaemon.Server
public void onCreateColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onCreateUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onUpdateKeyspace(String ksName)
@@ -388,7 +393,12 @@ public class Server implements CassandraDaemon.Server
public void onUpdateColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onUpdateUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onDropKeyspace(String ksName)
@@ -398,7 +408,12 @@ public class Server implements CassandraDaemon.Server
public void onDropColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onDropUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ef56882..3cf9b7b 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -157,7 +157,7 @@ public class SimpleClient
public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
{
- Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values)));
+ Message.Response msg = execute(new QueryMessage(query, QueryOptions.forInternalCalls(consistencyLevel, values)));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
@@ -171,7 +171,7 @@ public class SimpleClient
public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values)));
+ Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values)));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}