You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/09/10 16:13:54 UTC
git commit: Remove 1.2 network compatibility code
Updated Branches:
refs/heads/trunk b21a0dab5 -> 4f5242cfb
Remove 1.2 network compatibility code
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-5960
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f5242cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f5242cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f5242cf
Branch: refs/heads/trunk
Commit: 4f5242cfbfb302e8099cb514ea78f134fef84d45
Parents: b21a0da
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Sep 10 17:13:01 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Sep 10 17:13:01 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/RangeSliceCommand.java | 130 ++++---------------
.../org/apache/cassandra/db/ReadCommand.java | 46 +------
.../cassandra/db/SliceByNamesReadCommand.java | 102 +++------------
.../cassandra/db/SliceFromReadCommand.java | 89 +++----------
.../apache/cassandra/db/filter/QueryPath.java | 111 ----------------
.../cassandra/net/IncomingTcpConnection.java | 15 +--
.../apache/cassandra/service/StorageProxy.java | 2 +-
8 files changed, 73 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d1440cc..1f20fa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,8 @@
* change logging from log4j to logback (CASSANDRA-5883)
* switch to LZ4 compression for internode communication (CASSANDRA-5887)
* Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+
2.0.1
* Improve error message when yaml contains invalid properties (CASSANDRA-5958)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 5e8788c..28b86f8 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.base.Objects;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
@@ -46,37 +45,37 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
public final boolean isPaging;
public RangeSliceCommand(String keyspace,
- String column_family,
+ String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
int maxResults)
{
- this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false);
+ this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false);
}
public RangeSliceCommand(String keyspace,
- String column_family,
+ String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
List<IndexExpression> row_filter,
int maxResults)
{
- this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false);
+ this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false);
}
public RangeSliceCommand(String keyspace,
- String column_family,
+ String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
- List<IndexExpression> row_filter,
+ List<IndexExpression> rowFilter,
int maxResults,
boolean countCQL3Rows,
boolean isPaging)
{
- super(keyspace, column_family, timestamp, range, predicate, row_filter);
+ super(keyspace, columnFamily, timestamp, range, predicate, rowFilter);
this.maxResults = maxResults;
this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
@@ -84,7 +83,7 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
public MessageOut<RangeSliceCommand> createMessage()
{
- return new MessageOut<RangeSliceCommand>(MessagingService.Verb.RANGE_SLICE, this, serializer);
+ return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
}
public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
@@ -137,16 +136,16 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
@Override
public String toString()
{
- return "RangeSliceCommand{" +
- "keyspace='" + keyspace + '\'' +
- ", columnFamily='" + columnFamily + '\'' +
- ", timestamp=" + timestamp +
- ", predicate=" + predicate +
- ", range=" + keyRange +
- ", rowFilter =" + rowFilter +
- ", maxResults=" + maxResults +
- ", countCQL3Rows=" + countCQL3Rows +
- "}";
+ return Objects.toStringHelper(this)
+ .add("keyspace", keyspace)
+ .add("columnFamily", columnFamily)
+ .add("predicate", predicate)
+ .add("keyRange", keyRange)
+ .add("rowFilter", rowFilter)
+ .add("maxResults", maxResults)
+ .add("counterCQL3Rows", countCQL3Rows)
+ .add("timestamp", timestamp)
+ .toString();
}
}
@@ -156,31 +155,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
{
out.writeUTF(sliceCommand.keyspace);
out.writeUTF(sliceCommand.columnFamily);
+ out.writeLong(sliceCommand.timestamp);
- if (version >= MessagingService.VERSION_20)
- out.writeLong(sliceCommand.timestamp);
-
- IDiskAtomFilter filter = sliceCommand.predicate;
- if (version < MessagingService.VERSION_20)
- {
- // Pre-2.0, we need to know if it's a super column. If it is, we
- // must extract the super column name from the predicate (and
- // modify the predicate accordingly)
- ByteBuffer sc = null;
- CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.columnFamily);
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
- sc = scFilter.scName;
- filter = scFilter.updatedFilter;
- }
-
- out.writeInt(sc == null ? 0 : sc.remaining());
- if (sc != null)
- ByteBufferUtil.write(sc, out);
- }
-
- IDiskAtomFilter.Serializer.instance.serialize(filter, out, version);
+ IDiskAtomFilter.Serializer.instance.serialize(sliceCommand.predicate, out, version);
if (sliceCommand.rowFilter == null)
{
@@ -206,47 +183,15 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
{
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
-
- long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+ long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- IDiskAtomFilter predicate;
- if (version < MessagingService.VERSION_20)
- {
- int scLength = in.readInt();
- ByteBuffer superColumn = null;
- if (scLength > 0)
- {
- byte[] buf = new byte[scLength];
- in.readFully(buf);
- superColumn = ByteBuffer.wrap(buf);
- }
-
- AbstractType<?> comparator;
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- CompositeType type = (CompositeType)metadata.comparator;
- comparator = superColumn == null ? type.types.get(0) : type.types.get(1);
- }
- else
- {
- comparator = metadata.comparator;
- }
-
- predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, comparator);
-
- if (metadata.cfType == ColumnFamilyType.Super)
- predicate = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, superColumn, predicate);
- }
- else
- {
- predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
- }
+ IDiskAtomFilter predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
List<IndexExpression> rowFilter;
int filterCount = in.readInt();
- rowFilter = new ArrayList<IndexExpression>(filterCount);
+ rowFilter = new ArrayList<>(filterCount);
for (int i = 0; i < filterCount; i++)
{
IndexExpression expr;
@@ -267,32 +212,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
{
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
-
- if (version >= MessagingService.VERSION_20)
- size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+ size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
IDiskAtomFilter filter = rsc.predicate;
- if (version < MessagingService.VERSION_20)
- {
- ByteBuffer sc = null;
- CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
- sc = scFilter.scName;
- filter = scFilter.updatedFilter;
- }
-
- if (sc != null)
- {
- size += TypeSizes.NATIVE.sizeof(sc.remaining());
- size += sc.remaining();
- }
- else
- {
- size += TypeSizes.NATIVE.sizeof(0);
- }
- }
size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index cadcd7d..b6f954e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -22,13 +22,10 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -60,7 +57,7 @@ public abstract class ReadCommand implements IReadCommand, Pageable
public MessageOut<ReadCommand> createMessage()
{
- return new MessageOut<ReadCommand>(MessagingService.Verb.READ, this, serializer);
+ return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
}
public final String ksName;
@@ -135,31 +132,14 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
public void serialize(ReadCommand command, DataOutput out, int version) throws IOException
{
- // For super columns, when talking to an older node, we need to translate the filter used.
- // That translation can change the filter type (names -> slice), and so change the command type.
- // Hence we need to detect that early on, before we've written the command type.
- ReadCommand newCommand = command;
- ByteBuffer superColumn = null;
- if (version < MessagingService.VERSION_20)
- {
- CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
- newCommand.setDigestQuery(command.isDigestQuery());
- superColumn = scFilter.scName;
- }
- }
-
- out.writeByte(newCommand.commandType.serializedValue);
+ out.writeByte(command.commandType.serializedValue);
switch (command.commandType)
{
case GET_BY_NAMES:
- SliceByNamesReadCommand.serializer.serialize(newCommand, superColumn, out, version);
+ SliceByNamesReadCommand.serializer.serialize(command, out, version);
break;
case GET_SLICES:
- SliceFromReadCommand.serializer.serialize(newCommand, superColumn, out, version);
+ SliceFromReadCommand.serializer.serialize(command, out, version);
break;
default:
throw new AssertionError();
@@ -182,26 +162,12 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
public long serializedSize(ReadCommand command, int version)
{
- ReadCommand newCommand = command;
- ByteBuffer superColumn = null;
- if (version < MessagingService.VERSION_20)
- {
- CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
- newCommand.setDigestQuery(command.isDigestQuery());
- superColumn = scFilter.scName;
- }
- }
-
switch (command.commandType)
{
case GET_BY_NAMES:
- return 1 + SliceByNamesReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+ return 1 + SliceByNamesReadCommand.serializer.serializedSize(command, version);
case GET_SLICES:
- return 1 + SliceFromReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+ return 1 + SliceFromReadCommand.serializer.serializedSize(command, version);
default:
throw new AssertionError();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index ae3db78..60487c8 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
+import com.google.common.base.Objects;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -58,13 +57,13 @@ public class SliceByNamesReadCommand extends ReadCommand
@Override
public String toString()
{
- return "SliceByNamesReadCommand(" +
- "keyspace='" + ksName + '\'' +
- ", key=" + ByteBufferUtil.bytesToHex(key) +
- ", cfName='" + cfName + '\'' +
- ", timestamp='" + timestamp + '\'' +
- ", filter=" + filter +
- ')';
+ return Objects.toStringHelper(this)
+ .add("ksName", ksName)
+ .add("cfName", cfName)
+ .add("key", ByteBufferUtil.bytesToHex(key))
+ .add("filter", filter)
+ .add("timestamp", timestamp)
+ .toString();
}
public IDiskAtomFilter filter()
@@ -77,24 +76,12 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
{
public void serialize(ReadCommand cmd, DataOutput out, int version) throws IOException
{
- serialize(cmd, null, out, version);
- }
-
- public void serialize(ReadCommand cmd, ByteBuffer superColumn, DataOutput out, int version) throws IOException
- {
SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
out.writeBoolean(command.isDigestQuery());
out.writeUTF(command.ksName);
ByteBufferUtil.writeWithShortLength(command.key, out);
-
- if (version < MessagingService.VERSION_20)
- new QueryPath(command.cfName, superColumn).serialize(out);
- else
- out.writeUTF(command.cfName);
-
- if (version >= MessagingService.VERSION_20)
- out.writeLong(cmd.timestamp);
-
+ out.writeUTF(command.cfName);
+ out.writeLong(cmd.timestamp);
NamesQueryFilter.serializer.serialize(command.filter, out, version);
}
@@ -103,65 +90,17 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
boolean isDigest = in.readBoolean();
String keyspaceName = in.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-
- String cfName;
- ByteBuffer sc = null;
- if (version < MessagingService.VERSION_20)
- {
- QueryPath path = QueryPath.deserialize(in);
- cfName = path.columnFamilyName;
- sc = path.superColumnName;
- }
- else
- {
- cfName = in.readUTF();
- }
-
- long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
-
+ String cfName = in.readUTF();
+ long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- ReadCommand command;
- if (version < MessagingService.VERSION_20)
- {
- AbstractType<?> comparator;
- if (metadata.cfType == ColumnFamilyType.Super)
- {
- CompositeType type = (CompositeType)metadata.comparator;
- comparator = sc == null ? type.types.get(0) : type.types.get(1);
- }
- else
- {
- comparator = metadata.comparator;
- }
-
- IDiskAtomFilter filter = NamesQueryFilter.serializer.deserialize(in, version, comparator);
-
- if (metadata.cfType == ColumnFamilyType.Super)
- filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, sc, filter);
-
- // Due to SC compat, it's possible we get back a slice filter at this point
- if (filter instanceof NamesQueryFilter)
- command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, (NamesQueryFilter)filter);
- else
- command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, (SliceQueryFilter)filter);
- }
- else
- {
- NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
- command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
- }
-
+ NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
+ ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
command.setDigestQuery(isDigest);
return command;
}
public long serializedSize(ReadCommand cmd, int version)
{
- return serializedSize(cmd, null, version);
- }
-
- public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
- {
TypeSizes sizes = TypeSizes.NATIVE;
SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
int size = sizes.sizeof(command.isDigestQuery());
@@ -169,15 +108,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
size += sizes.sizeof(command.ksName);
size += sizes.sizeof((short)keySize) + keySize;
-
- if (version < MessagingService.VERSION_20)
- size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- else
- size += sizes.sizeof(command.cfName);
-
- if (version >= MessagingService.VERSION_20)
- size += sizes.sizeof(cmd.timestamp);
-
+ size += sizes.sizeof(command.cfName);
+ size += sizes.sizeof(cmd.timestamp);
size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 7526796..72de2ca 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -22,26 +22,18 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SliceFromReadCommand extends ReadCommand
{
- static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
-
static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
public final SliceQueryFilter filter;
@@ -124,13 +116,13 @@ public class SliceFromReadCommand extends ReadCommand
@Override
public String toString()
{
- return "SliceFromReadCommand(" +
- "keyspace='" + ksName + '\'' +
- ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
- ", cfName='" + cfName + '\'' +
- ", timestamp='" + timestamp + '\'' +
- ", filter='" + filter + '\'' +
- ')';
+ return Objects.toStringHelper(this)
+ .add("ksName", ksName)
+ .add("cfName", cfName)
+ .add("key", ByteBufferUtil.bytesToHex(key))
+ .add("filter", filter)
+ .add("timestamp", timestamp)
+ .toString();
}
}
@@ -138,24 +130,12 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
{
public void serialize(ReadCommand rm, DataOutput out, int version) throws IOException
{
- serialize(rm, null, out, version);
- }
-
- public void serialize(ReadCommand rm, ByteBuffer superColumn, DataOutput out, int version) throws IOException
- {
SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
out.writeBoolean(realRM.isDigestQuery());
out.writeUTF(realRM.ksName);
ByteBufferUtil.writeWithShortLength(realRM.key, out);
-
- if (version < MessagingService.VERSION_20)
- new QueryPath(realRM.cfName, superColumn).serialize(out);
- else
- out.writeUTF(realRM.cfName);
-
- if (version >= MessagingService.VERSION_20)
- out.writeLong(realRM.timestamp);
-
+ out.writeUTF(realRM.cfName);
+ out.writeLong(realRM.timestamp);
SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
}
@@ -164,36 +144,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
boolean isDigest = in.readBoolean();
String keyspaceName = in.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-
- String cfName;
- ByteBuffer sc = null;
- if (version < MessagingService.VERSION_20)
- {
- QueryPath path = QueryPath.deserialize(in);
- cfName = path.columnFamilyName;
- sc = path.superColumnName;
- }
- else
- {
- cfName = in.readUTF();
- }
-
- long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- SliceQueryFilter filter;
- if (version < MessagingService.VERSION_20)
- {
- filter = SliceQueryFilter.serializer.deserialize(in, version);
-
- if (metadata.cfType == ColumnFamilyType.Super)
- filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, sc, filter);
- }
- else
- {
- filter = SliceQueryFilter.serializer.deserialize(in, version);
- }
-
+ String cfName = in.readUTF();
+ long timestamp = in.readLong();
+ SliceQueryFilter filter = SliceQueryFilter.serializer.deserialize(in, version);
ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
command.setDigestQuery(isDigest);
return command;
@@ -201,11 +154,6 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
public long serializedSize(ReadCommand cmd, int version)
{
- return serializedSize(cmd, null, version);
- }
-
- public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
- {
TypeSizes sizes = TypeSizes.NATIVE;
SliceFromReadCommand command = (SliceFromReadCommand) cmd;
int keySize = command.key.remaining();
@@ -213,15 +161,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
size += sizes.sizeof(command.ksName);
size += sizes.sizeof((short) keySize) + keySize;
-
- if (version < MessagingService.VERSION_20)
- size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- else
- size += sizes.sizeof(command.cfName);
-
- if (version >= MessagingService.VERSION_20)
- size += sizes.sizeof(cmd.timestamp);
-
+ size += sizes.sizeof(command.cfName);
+ size += sizes.sizeof(cmd.timestamp);
size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/filter/QueryPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java
deleted file mode 100644
index 26d15a1..0000000
--- a/src/java/org/apache/cassandra/db/filter/QueryPath.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * This class is obsolete internally, but kept for wire compatibility with
- * older nodes. I.e. we kept it only for the serialization part.
- */
-public class QueryPath
-{
- public final String columnFamilyName;
- public final ByteBuffer superColumnName;
- public final ByteBuffer columnName;
-
- public QueryPath(String columnFamilyName, ByteBuffer superColumnName, ByteBuffer columnName)
- {
- this.columnFamilyName = columnFamilyName;
- this.superColumnName = superColumnName;
- this.columnName = columnName;
- }
-
- public QueryPath(String columnFamilyName, ByteBuffer superColumnName)
- {
- this(columnFamilyName, superColumnName, null);
- }
-
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + "(" +
- "columnFamilyName='" + columnFamilyName + '\'' +
- ", superColumnName='" + superColumnName + '\'' +
- ", columnName='" + columnName + '\'' +
- ')';
- }
-
- public void serialize(DataOutput out) throws IOException
- {
- assert !"".equals(columnFamilyName);
- assert superColumnName == null || superColumnName.remaining() > 0;
- assert columnName == null || columnName.remaining() > 0;
- out.writeUTF(columnFamilyName == null ? "" : columnFamilyName);
- ByteBufferUtil.writeWithShortLength(superColumnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : superColumnName, out);
- ByteBufferUtil.writeWithShortLength(columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName, out);
- }
-
- public static QueryPath deserialize(DataInput din) throws IOException
- {
- String cfName = din.readUTF();
- ByteBuffer scName = ByteBufferUtil.readWithShortLength(din);
- ByteBuffer cName = ByteBufferUtil.readWithShortLength(din);
- return new QueryPath(cfName.isEmpty() ? null : cfName,
- scName.remaining() == 0 ? null : scName,
- cName.remaining() == 0 ? null : cName);
- }
-
- public int serializedSize(TypeSizes typeSizes)
- {
- int size = 0;
-
- if (columnFamilyName == null)
- size += typeSizes.sizeof((short) 0);
- else
- size += typeSizes.sizeof(columnFamilyName);
-
- if (superColumnName == null)
- {
- size += typeSizes.sizeof((short) 0);
- }
- else
- {
- int scNameSize = superColumnName.remaining();
- size += typeSizes.sizeof((short) scNameSize);
- size += scNameSize;
- }
-
- if (columnName == null)
- {
- size += typeSizes.sizeof((short) 0);
- }
- else
- {
- int cNameSize = columnName.remaining();
- size += typeSizes.sizeof((short) cNameSize);
- size += cNameSize;
- }
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 3b0bc8f..3524a69 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -73,10 +73,12 @@ public class IncomingTcpConnection extends Thread
{
try
{
- if (version < MessagingService.VERSION_12)
- handleLegacyVersion();
- else
- handleModernVersion();
+ if (version < MessagingService.VERSION_20)
+ throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; "
+ + "The earliest version supported is 2.0.0",
+ version));
+
+ handleModernVersion();
}
catch (EOFException e)
{
@@ -141,11 +143,6 @@ public class IncomingTcpConnection extends Thread
}
}
- private void handleLegacyVersion()
- {
- throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0");
- }
-
private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
{
int id;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 50dfd07..cff4b02 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -837,7 +837,7 @@ public class StorageProxy implements StorageProxyMBean
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
// direct writes to local DC or old Cassandra versions
// (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
- if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
+ if (localDataCenter.equals(dc))
{
MessagingService.instance().sendRR(message, destination, responseHandler);
}