You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/03 17:29:56 UTC
[2/3] git commit: Merge branch 'cassandra-1.1' into trunk
Merge branch 'cassandra-1.1' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/QueryProcessor.java
src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe57553c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe57553c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe57553c
Branch: refs/heads/trunk
Commit: fe57553cf61f5f3aff510c6c80f6502b42dc7fef
Parents: 88ae22f 0e0213b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Oct 3 17:28:33 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Oct 3 17:28:33 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/cql3/ResultSet.java | 5 ++++-
.../apache/cassandra/db/marshal/ReversedType.java | 3 +--
.../org/apache/cassandra/transport/DataType.java | 5 +++++
4 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe57553c/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe57553c/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/ResultSet.java
index 2b51469,0000000..66b0c5a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@@ -1,340 -1,0 +1,343 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.LongType;
++import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CqlMetadata;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ResultSet
+{
+ public static final Codec codec = new Codec();
+ private static final ColumnIdentifier COUNT_COLUMN = new ColumnIdentifier("count", false);
+
+ public final Metadata metadata;
+ public final List<List<ByteBuffer>> rows;
+
+ public ResultSet(List<ColumnSpecification> metadata)
+ {
+ this(new Metadata(metadata), new ArrayList<List<ByteBuffer>>());
+ }
+
+ private ResultSet(Metadata metadata, List<List<ByteBuffer>> rows)
+ {
+ this.metadata = metadata;
+ this.rows = rows;
+ }
+
+ public int size()
+ {
+ return rows.size();
+ }
+
+ public void addColumnValue(ByteBuffer value)
+ {
+ if (rows.isEmpty() || lastRow().size() == metadata.names.size())
+ rows.add(new ArrayList<ByteBuffer>(metadata.names.size()));
+
+ lastRow().add(value);
+ }
+
+ private List<ByteBuffer> lastRow()
+ {
+ return rows.get(rows.size() - 1);
+ }
+
+ public void reverse()
+ {
+ Collections.reverse(rows);
+ }
+
+ public void trim(int limit)
+ {
+ int toRemove = rows.size() - limit;
+ if (toRemove > 0)
+ {
+ for (int i = 0; i < toRemove; i++)
+ rows.remove(rows.size() - 1);
+ }
+ }
+
+ public ResultSet makeCountResult()
+ {
+ String ksName = metadata.names.get(0).ksName;
+ String cfName = metadata.names.get(0).cfName;
+ metadata.names.clear();
+ metadata.names.add(new ColumnSpecification(ksName, cfName, COUNT_COLUMN, LongType.instance));
+
+ long count = rows.size();
+ rows.clear();
+ rows.add(Collections.singletonList(ByteBufferUtil.bytes(count)));
+ return this;
+ }
+
+ public CqlResult toThriftResult()
+ {
+ String UTF8 = "UTF8Type";
+ CqlMetadata schema = new CqlMetadata(new HashMap<ByteBuffer, String>(),
+ new HashMap<ByteBuffer, String>(),
+ // The 2 following ones shouldn't be needed in CQL3
+ UTF8, UTF8);
+
+ for (ColumnSpecification name : metadata.names)
+ {
+ ByteBuffer colName = ByteBufferUtil.bytes(name.toString());
+ schema.name_types.put(colName, UTF8);
- schema.value_types.put(colName, name.type.toString());
++ AbstractType<?> normalizedType = name.type instanceof ReversedType ? ((ReversedType)name.type).baseType : name.type;
++ schema.value_types.put(colName, normalizedType.toString());
++
+ }
+
+ List<CqlRow> cqlRows = new ArrayList<CqlRow>(rows.size());
+ for (List<ByteBuffer> row : rows)
+ {
+ List<Column> thriftCols = new ArrayList<Column>(metadata.names.size());
+ for (int i = 0; i < metadata.names.size(); i++)
+ {
+ Column col = new Column(ByteBufferUtil.bytes(metadata.names.get(i).toString()));
+ col.setValue(row.get(i));
+ thriftCols.add(col);
+ }
+ // The key of CqlRow shoudn't be needed in CQL3
+ cqlRows.add(new CqlRow(ByteBufferUtil.EMPTY_BYTE_BUFFER, thriftCols));
+ }
+ CqlResult res = new CqlResult(CqlResultType.ROWS);
+ res.setRows(cqlRows).setSchema(schema);
+ return res;
+ }
+
+ @Override
+ public String toString()
+ {
+ try
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(metadata).append('\n');
+ for (List<ByteBuffer> row : rows)
+ {
+ for (int i = 0; i < row.size(); i++)
+ {
+ ByteBuffer v = row.get(i);
+ if (v == null)
+ sb.append(" | null");
+ else
+ sb.append(" | ").append(metadata.names.get(i).type.getString(v));
+ }
+ sb.append('\n');
+ }
+ sb.append("---");
+ return sb.toString();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class Codec implements CBCodec<ResultSet>
+ {
+ /*
+ * Format:
+ * - metadata
+ * - rows count (4 bytes)
+ * - rows
+ */
+ public ResultSet decode(ChannelBuffer body)
+ {
+ Metadata m = Metadata.codec.decode(body);
+ int rowCount = body.readInt();
+ ResultSet rs = new ResultSet(m, new ArrayList<List<ByteBuffer>>(rowCount));
+
+ // rows
+ int totalValues = rowCount * m.names.size();
+ for (int i = 0; i < totalValues; i++)
+ rs.addColumnValue(CBUtil.readValue(body));
+
+ return rs;
+ }
+
+ public ChannelBuffer encode(ResultSet rs)
+ {
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.names.size() * rs.rows.size());
+ builder.add(Metadata.codec.encode(rs.metadata));
+ builder.add(CBUtil.intToCB(rs.rows.size()));
+
+ for (List<ByteBuffer> row : rs.rows)
+ {
+ for (ByteBuffer bb : row)
+ builder.addValue(bb);
+ }
+
+ return builder.build();
+ }
+ }
+
+ public static class Metadata
+ {
+ public static final CBCodec<Metadata> codec = new Codec();
+
+ public final EnumSet<Flag> flags;
+ public final List<ColumnSpecification> names;
+
+ public Metadata(List<ColumnSpecification> names)
+ {
+ this(EnumSet.noneOf(Flag.class), names);
+ if (!names.isEmpty() && allInSameCF())
+ flags.add(Flag.GLOBAL_TABLES_SPEC);
+ }
+
+ private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names)
+ {
+ this.flags = flags;
+ this.names = names;
+ }
+
+ private boolean allInSameCF()
+ {
+ assert !names.isEmpty();
+
+ Iterator<ColumnSpecification> iter = names.iterator();
+ ColumnSpecification first = iter.next();
+ while (iter.hasNext())
+ {
+ ColumnSpecification name = iter.next();
+ if (!name.ksName.equals(first.ksName) || !name.cfName.equals(first.cfName))
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (ColumnSpecification name : names)
+ {
+ sb.append("[").append(name.toString());
+ sb.append("(").append(name.ksName).append(", ").append(name.cfName).append(")");
+ sb.append(", ").append(name.type).append("]");
+ }
+ return sb.toString();
+ }
+
+ private static class Codec implements CBCodec<Metadata>
+ {
+ public Metadata decode(ChannelBuffer body)
+ {
+ // flags & column count
+ int iflags = body.readInt();
+ int columnCount = body.readInt();
+
+ EnumSet<Flag> flags = Flag.deserialize(iflags);
+ boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC);
+
+ String globalKsName = null;
+ String globalCfName = null;
+ if (globalTablesSpec)
+ {
+ globalKsName = CBUtil.readString(body);
+ globalCfName = CBUtil.readString(body);
+ }
+
+ // metadata (names/types)
+ List<ColumnSpecification> names = new ArrayList<ColumnSpecification>(columnCount);
+ for (int i = 0; i < columnCount; i++)
+ {
+ String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
+ String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
+ ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
+ AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
+ names.add(new ColumnSpecification(ksName, cfName, colName, type));
+ }
+ return new Metadata(flags, names);
+ }
+
+ public ChannelBuffer encode(Metadata m)
+ {
+ boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+
+ int stringCount = globalTablesSpec ? 2 + m.names.size() : 3* m.names.size();
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + m.names.size(), stringCount, 0);
+
+ ChannelBuffer header = ChannelBuffers.buffer(8);
+ header.writeInt(Flag.serialize(m.flags));
+ header.writeInt(m.names.size());
+ builder.add(header);
+
+ if (globalTablesSpec)
+ {
+ builder.addString(m.names.get(0).ksName);
+ builder.addString(m.names.get(0).cfName);
+ }
+
+ for (ColumnSpecification name : m.names)
+ {
+ if (!globalTablesSpec)
+ {
+ builder.addString(name.ksName);
+ builder.addString(name.cfName);
+ }
+ builder.addString(name.toString());
+ builder.add(DataType.codec.encodeOne(DataType.fromType(name.type)));
+ }
+ return builder.build();
+ }
+ }
+ }
+
+ public static enum Flag
+ {
+ // The order of that enum matters!!
+ GLOBAL_TABLES_SPEC;
+
+ public static EnumSet<Flag> deserialize(int flags)
+ {
+ EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
+ Flag[] values = Flag.values();
+ for (int n = 0; n < 32; n++)
+ {
+ if ((flags & (1 << n)) != 0)
+ set.add(values[n]);
+ }
+ return set;
+ }
+
+ public static int serialize(EnumSet<Flag> flags)
+ {
+ int i = 0;
+ for (Flag flag : flags)
+ i |= 1 << flag.ordinal();
+ return i;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe57553c/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/marshal/ReversedType.java
index d5db1e7,6165f3c..40e9a84
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@@ -30,10 -30,9 +30,9 @@@ public class ReversedType<T> extends Ab
// interning instances
private static final Map<AbstractType<?>, ReversedType> instances = new HashMap<AbstractType<?>, ReversedType>();
- // package protected for unit tests sake
- final AbstractType<T> baseType;
+ public final AbstractType<T> baseType;
- public static <T> ReversedType<T> getInstance(TypeParser parser) throws ConfigurationException
+ public static <T> ReversedType<T> getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
{
List<AbstractType<?>> types = parser.getTypeParameters();
if (types.size() != 1)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe57553c/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/DataType.java
index 80528e7,0000000..21f4d03
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@@ -1,197 -1,0 +1,202 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.Pair;
+
+public enum DataType implements OptionCodec.Codecable<DataType>
+{
+ CUSTOM (0, null),
+ ASCII (1, AsciiType.instance),
+ BIGINT (2, LongType.instance),
+ BLOB (3, BytesType.instance),
+ BOOLEAN (4, BooleanType.instance),
+ COUNTER (5, CounterColumnType.instance),
+ DECIMAL (6, DecimalType.instance),
+ DOUBLE (7, DoubleType.instance),
+ FLOAT (8, FloatType.instance),
+ INT (9, Int32Type.instance),
+ TEXT (10, UTF8Type.instance),
+ TIMESTAMP(11, DateType.instance),
+ UUID (12, UUIDType.instance),
+ VARCHAR (13, UTF8Type.instance),
+ VARINT (14, IntegerType.instance),
+ TIMEUUID (15, TimeUUIDType.instance),
+ INET (16, InetAddressType.instance),
+ LIST (32, null),
+ MAP (33, null),
+ SET (34, null);
+
+ public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
+
+ private final int id;
+ private final AbstractType type;
+ private static final Map<AbstractType, DataType> dataTypeMap = new HashMap<AbstractType, DataType>();
+ static
+ {
+ for (DataType type : DataType.values())
+ {
+ if (type.type != null)
+ dataTypeMap.put(type.type, type);
+ }
+ }
+
+ private DataType(int id, AbstractType type)
+ {
+ this.id = id;
+ this.type = type;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public Object readValue(ChannelBuffer cb)
+ {
+ switch (this)
+ {
+ case CUSTOM:
+ return CBUtil.readString(cb);
+ case LIST:
+ return DataType.toType(codec.decodeOne(cb));
+ case SET:
+ return DataType.toType(codec.decodeOne(cb));
+ case MAP:
+ List<AbstractType> l = new ArrayList<AbstractType>(2);
+ l.add(DataType.toType(codec.decodeOne(cb)));
+ l.add(DataType.toType(codec.decodeOne(cb)));
+ return l;
+ default:
+ return null;
+ }
+ }
+
+ public void writeValue(Object value, ChannelBuffer cb)
+ {
+ switch (this)
+ {
+ case CUSTOM:
+ assert value instanceof String;
+ cb.writeBytes(CBUtil.stringToCB((String)value));
+ break;
+ case LIST:
+ cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+ break;
+ case SET:
+ cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+ break;
+ case MAP:
+ List<AbstractType> l = (List<AbstractType>)value;
+ cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(0))));
+ cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(1))));
+ break;
+ }
+ }
+
+ public int serializedValueSize(Object value)
+ {
+ switch (this)
+ {
+ case CUSTOM:
+ return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
+ case LIST:
+ case SET:
+ return codec.oneSerializedSize(DataType.fromType((AbstractType)value));
+ case MAP:
+ List<AbstractType> l = (List<AbstractType>)value;
+ int s = 0;
+ s += codec.oneSerializedSize(DataType.fromType(l.get(0)));
+ s += codec.oneSerializedSize(DataType.fromType(l.get(1)));
+ return s;
+ default:
+ return 0;
+ }
+ }
+
+ public static Pair<DataType, Object> fromType(AbstractType type)
+ {
++ // For CQL3 clients, ReversedType is an implementation detail and they
++ // shouldn't have to care about it.
++ if (type instanceof ReversedType)
++ type = ((ReversedType)type).baseType;
++
+ DataType dt = dataTypeMap.get(type);
+ if (dt == null)
+ {
+ if (type.isCollection())
+ {
+ if (type instanceof ListType)
+ {
+ return Pair.<DataType, Object>create(LIST, ((ListType)type).elements);
+ }
+ else if (type instanceof MapType)
+ {
+ MapType mt = (MapType)type;
+ return Pair.<DataType, Object>create(MAP, Arrays.asList(mt.keys, mt.values));
+ }
+ else
+ {
+ assert type instanceof SetType;
+ return Pair.<DataType, Object>create(SET, ((SetType)type).elements);
+ }
+ }
+ return Pair.<DataType, Object>create(CUSTOM, type.toString());
+ }
+ else
+ {
+ return Pair.create(dt, null);
+ }
+ }
+
+ public static AbstractType toType(Pair<DataType, Object> entry)
+ {
+ try
+ {
+ switch (entry.left)
+ {
+ case CUSTOM:
+ return TypeParser.parse((String)entry.right);
+ case LIST:
+ return ListType.getInstance((AbstractType)entry.right);
+ case SET:
+ return SetType.getInstance((AbstractType)entry.right);
+ case MAP:
+ List<AbstractType> l = (List<AbstractType>)entry.right;
+ return MapType.getInstance(l.get(0), l.get(1));
+ default:
+ return entry.left.type;
+ }
+ }
+ catch (RequestValidationException e)
+ {
+ throw new ProtocolException(e.getMessage());
+ }
+ }
+}