You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:50:02 UTC
[09/11] cassandra git commit: Remove pre-3.0 compatibility code for
4.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
deleted file mode 100644
index d6aac64..0000000
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.IndexInfo;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * Holds references on serializers that depend on the table definition.
- */
-public class Serializers
-{
- private final CFMetaData metadata;
-
- private Map<Version, IndexInfo.Serializer> otherVersionClusteringSerializers;
-
- private final IndexInfo.Serializer latestVersionIndexSerializer;
-
- public Serializers(CFMetaData metadata)
- {
- this.metadata = metadata;
- this.latestVersionIndexSerializer = new IndexInfo.Serializer(BigFormat.latestVersion,
- indexEntryClusteringPrefixSerializer(BigFormat.latestVersion, SerializationHeader.makeWithoutStats(metadata)));
- }
-
- IndexInfo.Serializer indexInfoSerializer(Version version, SerializationHeader header)
- {
- // null header indicates streaming from pre-3.0 sstables
- if (version.equals(BigFormat.latestVersion) && header != null)
- return latestVersionIndexSerializer;
-
- if (otherVersionClusteringSerializers == null)
- otherVersionClusteringSerializers = new ConcurrentHashMap<>();
- IndexInfo.Serializer serializer = otherVersionClusteringSerializers.get(version);
- if (serializer == null)
- {
- serializer = new IndexInfo.Serializer(version,
- indexEntryClusteringPrefixSerializer(version, header));
- otherVersionClusteringSerializers.put(version, serializer);
- }
- return serializer;
- }
-
- // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this method and inline the calls to
- // ClusteringPrefix.serializer directly. At which point this whole class probably becomes
- // unecessary (since IndexInfo.Serializer won't depend on the metadata either).
- private ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(Version version, SerializationHeader header)
- {
- if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables
- {
- return oldFormatSerializer(version);
- }
-
- return new NewFormatSerializer(version, header.clusteringTypes());
- }
-
- private ISerializer<ClusteringPrefix> oldFormatSerializer(Version version)
- {
- return new ISerializer<ClusteringPrefix>()
- {
- List<AbstractType<?>> clusteringTypes = SerializationHeader.makeWithoutStats(metadata).clusteringTypes();
-
- public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
- {
- //we deserialize in the old format and serialize in the new format
- ClusteringPrefix.serializer.serialize(clustering, out,
- version.correspondingMessagingVersion(),
- clusteringTypes);
- }
-
- @Override
- public void skip(DataInputPlus in) throws IOException
- {
- ByteBufferUtil.skipShortLength(in);
- }
-
- public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
- {
- // We're reading the old cellname/composite
- ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
- assert bb.hasRemaining(); // empty cellnames were invalid
-
- int clusteringSize = metadata.clusteringColumns().size();
- // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
- if (clusteringSize == 0)
- return Clustering.EMPTY;
-
- if (!metadata.isCompound())
- return Clustering.make(bb);
-
- List<ByteBuffer> components = CompositeType.splitName(bb);
- byte eoc = CompositeType.lastEOC(bb);
-
- if (eoc == 0 || components.size() >= clusteringSize)
- {
- // That's a clustering.
- if (components.size() > clusteringSize)
- components = components.subList(0, clusteringSize);
-
- return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
- }
- else
- {
- // It's a range tombstone bound. It is a start since that's the only part we've ever included
- // in the index entries.
- ClusteringPrefix.Kind boundKind = eoc > 0
- ? ClusteringPrefix.Kind.EXCL_START_BOUND
- : ClusteringPrefix.Kind.INCL_START_BOUND;
-
- return ClusteringBound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
- }
- }
-
- public long serializedSize(ClusteringPrefix clustering)
- {
- return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(),
- clusteringTypes);
- }
- };
- }
-
- private static class NewFormatSerializer implements ISerializer<ClusteringPrefix>
- {
- private final Version version;
- private final List<AbstractType<?>> clusteringTypes;
-
- NewFormatSerializer(Version version, List<AbstractType<?>> clusteringTypes)
- {
- this.version = version;
- this.clusteringTypes = clusteringTypes;
- }
-
- public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
- {
- ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), clusteringTypes);
- }
-
- @Override
- public void skip(DataInputPlus in) throws IOException
- {
- ClusteringPrefix.serializer.skip(in, version.correspondingMessagingVersion(), clusteringTypes);
- }
-
- public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
- {
- return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), clusteringTypes);
- }
-
- public long serializedSize(ClusteringPrefix clustering)
- {
- return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), clusteringTypes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d87d277..1d6ec7b 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -935,9 +935,9 @@ public class SinglePartitionReadCommand extends ReadCommand
nowInSec());
}
- public MessageOut<ReadCommand> createMessage(int version)
+ public MessageOut<ReadCommand> createMessage()
{
- return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer);
+ return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
}
protected void appendCQLWhereClause(StringBuilder sb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 31a461b..91adf3a 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -102,16 +102,6 @@ public final class SystemKeyspace
public static final String BUILT_VIEWS = "built_views";
public static final String PREPARED_STATEMENTS = "prepared_statements";
- @Deprecated public static final String LEGACY_HINTS = "hints";
- @Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
- @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
- @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
- @Deprecated public static final String LEGACY_COLUMNS = "schema_columns";
- @Deprecated public static final String LEGACY_TRIGGERS = "schema_triggers";
- @Deprecated public static final String LEGACY_USERTYPES = "schema_usertypes";
- @Deprecated public static final String LEGACY_FUNCTIONS = "schema_functions";
- @Deprecated public static final String LEGACY_AGGREGATES = "schema_aggregates";
-
public static final CFMetaData Batches =
compile(BATCHES,
"batches awaiting replay",
@@ -288,148 +278,6 @@ public final class SystemKeyspace
+ "query_string text,"
+ "PRIMARY KEY ((prepared_id)))");
- @Deprecated
- public static final CFMetaData LegacyHints =
- compile(LEGACY_HINTS,
- "*DEPRECATED* hints awaiting delivery",
- "CREATE TABLE %s ("
- + "target_id uuid,"
- + "hint_id timeuuid,"
- + "message_version int,"
- + "mutation blob,"
- + "PRIMARY KEY ((target_id), hint_id, message_version)) "
- + "WITH COMPACT STORAGE")
- .compaction(CompactionParams.scts(singletonMap("enabled", "false")))
- .gcGraceSeconds(0);
-
- @Deprecated
- public static final CFMetaData LegacyBatchlog =
- compile(LEGACY_BATCHLOG,
- "*DEPRECATED* batchlog entries",
- "CREATE TABLE %s ("
- + "id uuid,"
- + "data blob,"
- + "version int,"
- + "written_at timestamp,"
- + "PRIMARY KEY ((id)))")
- .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
- .gcGraceSeconds(0);
-
- @Deprecated
- public static final CFMetaData LegacyKeyspaces =
- compile(LEGACY_KEYSPACES,
- "*DEPRECATED* keyspace definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "durable_writes boolean,"
- + "strategy_class text,"
- + "strategy_options text,"
- + "PRIMARY KEY ((keyspace_name))) "
- + "WITH COMPACT STORAGE");
-
- @Deprecated
- public static final CFMetaData LegacyColumnfamilies =
- compile(LEGACY_COLUMNFAMILIES,
- "*DEPRECATED* table definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "bloom_filter_fp_chance double,"
- + "caching text,"
- + "cf_id uuid," // post-2.1 UUID cfid
- + "comment text,"
- + "compaction_strategy_class text,"
- + "compaction_strategy_options text,"
- + "comparator text,"
- + "compression_parameters text,"
- + "default_time_to_live int,"
- + "default_validator text,"
- + "dropped_columns map<text, bigint>,"
- + "gc_grace_seconds int,"
- + "is_dense boolean,"
- + "key_validator text,"
- + "local_read_repair_chance double,"
- + "max_compaction_threshold int,"
- + "max_index_interval int,"
- + "memtable_flush_period_in_ms int,"
- + "min_compaction_threshold int,"
- + "min_index_interval int,"
- + "read_repair_chance double,"
- + "speculative_retry text,"
- + "subcomparator text,"
- + "type text,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name))");
-
- @Deprecated
- public static final CFMetaData LegacyColumns =
- compile(LEGACY_COLUMNS,
- "*DEPRECATED* column definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "column_name text,"
- + "component_index int,"
- + "index_name text,"
- + "index_options text,"
- + "index_type text,"
- + "type text,"
- + "validator text,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))");
-
- @Deprecated
- public static final CFMetaData LegacyTriggers =
- compile(LEGACY_TRIGGERS,
- "*DEPRECATED* trigger definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "trigger_name text,"
- + "trigger_options map<text, text>,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))");
-
- @Deprecated
- public static final CFMetaData LegacyUsertypes =
- compile(LEGACY_USERTYPES,
- "*DEPRECATED* user defined type definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "type_name text,"
- + "field_names list<text>,"
- + "field_types list<text>,"
- + "PRIMARY KEY ((keyspace_name), type_name))");
-
- @Deprecated
- public static final CFMetaData LegacyFunctions =
- compile(LEGACY_FUNCTIONS,
- "*DEPRECATED* user defined function definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "function_name text,"
- + "signature frozen<list<text>>,"
- + "argument_names list<text>,"
- + "argument_types list<text>,"
- + "body text,"
- + "language text,"
- + "return_type text,"
- + "called_on_null_input boolean,"
- + "PRIMARY KEY ((keyspace_name), function_name, signature))");
-
- @Deprecated
- public static final CFMetaData LegacyAggregates =
- compile(LEGACY_AGGREGATES,
- "*DEPRECATED* user defined aggregate definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "aggregate_name text,"
- + "signature frozen<list<text>>,"
- + "argument_types list<text>,"
- + "final_func text,"
- + "initcond blob,"
- + "return_type text,"
- + "state_func text,"
- + "state_type text,"
- + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
-
private static CFMetaData compile(String name, String description, String schema)
{
return CFMetaData.compile(String.format(schema, name), SchemaConstants.SYSTEM_KEYSPACE_NAME)
@@ -457,16 +305,7 @@ public final class SystemKeyspace
TransferredRanges,
ViewsBuildsInProgress,
BuiltViews,
- LegacyHints,
- LegacyBatchlog,
- PreparedStatements,
- LegacyKeyspaces,
- LegacyColumnfamilies,
- LegacyColumns,
- LegacyTriggers,
- LegacyUsertypes,
- LegacyFunctions,
- LegacyAggregates);
+ PreparedStatements);
}
private static Functions functions()
@@ -1131,18 +970,27 @@ public final class SystemKeyspace
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
+
+ // Note: Pre-3.0, we used to not store the versions at which things were serialized. As 3.0 is a mandatory
+ // upgrade to 4.0+ and the paxos table is TTLed, it's _very_ unlikely we'll ever read a proposal or MRC without
+ // a version. But if we do (say gc_grace, on which the TTL is based, happens to be super large), we consider
+ // the commit too old and ignore it.
+ if (!row.has("proposal_version") || !row.has("most_recent_commit_version"))
+ return new PaxosState(key, metadata);
+
+
Commit promised = row.has("in_progress_ballot")
? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.partitionColumns(), 1))
: Commit.emptyCommit(key, metadata);
// either we have both a recently accepted ballot and update or we have neither
- int proposalVersion = row.has("proposal_version") ? row.getInt("proposal_version") : MessagingService.VERSION_21;
- Commit accepted = row.has("proposal")
- ? new Commit(row.getUUID("proposal_ballot"), PartitionUpdate.fromBytes(row.getBytes("proposal"), proposalVersion, key))
+ Commit accepted = row.has("proposal_version") && row.has("proposal")
+ ? new Commit(row.getUUID("proposal_ballot"),
+ PartitionUpdate.fromBytes(row.getBytes("proposal"), row.getInt("proposal_version")))
: Commit.emptyCommit(key, metadata);
// either most_recent_commit and most_recent_commit_at will both be set, or neither
- int mostRecentVersion = row.has("most_recent_commit_version") ? row.getInt("most_recent_commit_version") : MessagingService.VERSION_21;
- Commit mostRecent = row.has("most_recent_commit")
- ? new Commit(row.getUUID("most_recent_commit_at"), PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), mostRecentVersion, key))
+ Commit mostRecent = row.has("most_recent_commit_version") && row.has("most_recent_commit")
+ ? new Commit(row.getUUID("most_recent_commit_at"),
+ PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), row.getInt("most_recent_commit_version")))
: Commit.emptyCommit(key, metadata);
return new PaxosState(promised, accepted, mostRecent);
}
@@ -1404,45 +1252,17 @@ public final class SystemKeyspace
return result.one().getString("release_version");
}
- /**
- * Check data directories for old files that can be removed when migrating from 2.1 or 2.2 to 3.0,
- * these checks can be removed in 4.0, see CASSANDRA-7066
- */
- public static void migrateDataDirs()
- {
- Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
- for (String dataDir : dirs)
- {
- logger.trace("Checking {} for old files", dataDir);
- File dir = new File(dataDir);
- assert dir.exists() : dir + " should have been created by startup checks";
-
- for (File ksdir : dir.listFiles((d, n) -> new File(d, n).isDirectory()))
- {
- logger.trace("Checking {} for old files", ksdir);
-
- for (File cfdir : ksdir.listFiles((d, n) -> new File(d, n).isDirectory()))
- {
- logger.trace("Checking {} for old files", cfdir);
-
- if (Descriptor.isLegacyFile(cfdir))
- {
- FileUtils.deleteRecursive(cfdir);
- }
- else
- {
- FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(new File(d, n))));
- }
- }
- }
- }
- }
-
private static ByteBuffer rangeToBytes(Range<Token> range)
{
try (DataOutputBuffer out = new DataOutputBuffer())
{
- Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_22);
+ // The format with which token ranges are serialized in the system tables is the pre-3.0 serialization
+ // formot for ranges, so we should maintain that for now. And while we don't really support pre-3.0
+ // messaging versions, we know AbstractBounds.Serializer still support it _exactly_ for this use case, so we
+ // pass 0 as the version to trigger that legacy code.
+ // In the future, it might be worth switching to a stable text format for the ranges to 1) save that and 2)
+ // be more user friendly (the serialization format we currently use is pretty custom).
+ Range.tokenSerializer.serialize(range, out, 0);
return out.buffer();
}
catch (IOException e)
@@ -1456,9 +1276,10 @@ public final class SystemKeyspace
{
try
{
+ // See rangeToBytes above for why version is 0.
return (Range<Token>) Range.tokenSerializer.deserialize(ByteStreams.newDataInput(ByteBufferUtil.getArray(rawRange)),
partitioner,
- MessagingService.VERSION_22);
+ 0);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 9e39105..c32a642 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.net.MessagingService;
* we don't do more work than necessary (i.e. we don't allocate/deserialize
* objects for things we don't care about).
*/
-public abstract class UnfilteredDeserializer
+public class UnfilteredDeserializer
{
private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class);
@@ -47,32 +47,67 @@ public abstract class UnfilteredDeserializer
protected final DataInputPlus in;
protected final SerializationHelper helper;
- protected UnfilteredDeserializer(CFMetaData metadata,
- DataInputPlus in,
- SerializationHelper helper)
+ private final ClusteringPrefix.Deserializer clusteringDeserializer;
+ private final SerializationHeader header;
+
+ private int nextFlags;
+ private int nextExtendedFlags;
+ private boolean isReady;
+ private boolean isDone;
+
+ private final Row.Builder builder;
+
+ private UnfilteredDeserializer(CFMetaData metadata,
+ DataInputPlus in,
+ SerializationHeader header,
+ SerializationHelper helper)
{
this.metadata = metadata;
this.in = in;
this.helper = helper;
+ this.header = header;
+ this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
+ this.builder = BTreeRow.sortedBuilder();
}
public static UnfilteredDeserializer create(CFMetaData metadata,
DataInputPlus in,
SerializationHeader header,
- SerializationHelper helper,
- DeletionTime partitionDeletion,
- boolean readAllAsDynamic)
+ SerializationHelper helper)
{
- if (helper.version >= MessagingService.VERSION_30)
- return new CurrentDeserializer(metadata, in, header, helper);
- else
- return new OldFormatDeserializer(metadata, in, helper, partitionDeletion, readAllAsDynamic);
+ return new UnfilteredDeserializer(metadata, in, header, helper);
}
/**
* Whether or not there is more atom to read.
*/
- public abstract boolean hasNext() throws IOException;
+ public boolean hasNext() throws IOException
+ {
+ if (isReady)
+ return true;
+
+ prepareNext();
+ return !isDone;
+ }
+
+ private void prepareNext() throws IOException
+ {
+ if (isDone)
+ return;
+
+ nextFlags = in.readUnsignedByte();
+ if (UnfilteredSerializer.isEndOfPartition(nextFlags))
+ {
+ isDone = true;
+ isReady = false;
+ return;
+ }
+
+ nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags);
+
+ clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
+ isReady = true;
+ }
/**
* Compare the provided bound to the next atom to read on disk.
@@ -81,585 +116,68 @@ public abstract class UnfilteredDeserializer
* comparison. Whenever we know what to do with this atom (read it or skip it),
* readNext or skipNext should be called.
*/
- public abstract int compareNextTo(ClusteringBound bound) throws IOException;
-
- /**
- * Returns whether the next atom is a row or not.
- */
- public abstract boolean nextIsRow() throws IOException;
-
- /**
- * Returns whether the next atom is the static row or not.
- */
- public abstract boolean nextIsStatic() throws IOException;
+ public int compareNextTo(ClusteringBound bound) throws IOException
+ {
+ if (!isReady)
+ prepareNext();
- /**
- * Returns the next atom.
- */
- public abstract Unfiltered readNext() throws IOException;
+ assert !isDone;
- /**
- * Clears any state in this deserializer.
- */
- public abstract void clearState() throws IOException;
+ return clusteringDeserializer.compareNextTo(bound);
+ }
/**
- * Skips the next atom.
+ * Returns whether the next atom is a row or not.
*/
- public abstract void skipNext() throws IOException;
+ public boolean nextIsRow() throws IOException
+ {
+ if (!isReady)
+ prepareNext();
+ return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW;
+ }
/**
- * For the legacy layout deserializer, we have to deal with the fact that a row can span multiple index blocks and that
- * the call to hasNext() reads the next element upfront. We must take that into account when we check in AbstractSSTableIterator if
- * we're past the end of an index block boundary as that check expect to account for only consumed data (that is, if hasNext has
- * been called and made us cross an index boundary but neither readNext() or skipNext() as yet been called, we shouldn't consider
- * the index block boundary crossed yet).
- *
- * TODO: we don't care about this for the current file format because a row can never span multiple index blocks (further, hasNext()
- * only just basically read 2 bytes from disk in that case). So once we drop backward compatibility with pre-3.0 sstable, we should
- * remove this.
+ * Returns the next atom.
*/
- public abstract long bytesReadForUnconsumedData();
-
- private static class CurrentDeserializer extends UnfilteredDeserializer
+ public Unfiltered readNext() throws IOException
{
- private final ClusteringPrefix.Deserializer clusteringDeserializer;
- private final SerializationHeader header;
-
- private int nextFlags;
- private int nextExtendedFlags;
- private boolean isReady;
- private boolean isDone;
-
- private final Row.Builder builder;
-
- private CurrentDeserializer(CFMetaData metadata,
- DataInputPlus in,
- SerializationHeader header,
- SerializationHelper helper)
+ isReady = false;
+ if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- super(metadata, in, helper);
- this.header = header;
- this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
- this.builder = BTreeRow.sortedBuilder();
+ ClusteringBoundOrBoundary bound = clusteringDeserializer.deserializeNextBound();
+ return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound);
}
-
- public boolean hasNext() throws IOException
- {
- if (isReady)
- return true;
-
- prepareNext();
- return !isDone;
- }
-
- private void prepareNext() throws IOException
- {
- if (isDone)
- return;
-
- nextFlags = in.readUnsignedByte();
- if (UnfilteredSerializer.isEndOfPartition(nextFlags))
- {
- isDone = true;
- isReady = false;
- return;
- }
-
- nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags);
-
- clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
- isReady = true;
- }
-
- public int compareNextTo(ClusteringBound bound) throws IOException
- {
- if (!isReady)
- prepareNext();
-
- assert !isDone;
-
- return clusteringDeserializer.compareNextTo(bound);
- }
-
- public boolean nextIsRow() throws IOException
- {
- if (!isReady)
- prepareNext();
-
- return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW;
- }
-
- public boolean nextIsStatic() throws IOException
- {
- // This exists only for the sake of the OldFormatDeserializer
- throw new UnsupportedOperationException();
- }
-
- public Unfiltered readNext() throws IOException
- {
- isReady = false;
- if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
- {
- ClusteringBoundOrBoundary bound = clusteringDeserializer.deserializeNextBound();
- return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound);
- }
- else
- {
- builder.newRow(clusteringDeserializer.deserializeNextClustering());
- return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder);
- }
- }
-
- public void skipNext() throws IOException
- {
- isReady = false;
- clusteringDeserializer.skipNext();
- if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
- {
- UnfilteredSerializer.serializer.skipMarkerBody(in);
- }
- else
- {
- UnfilteredSerializer.serializer.skipRowBody(in);
- }
- }
-
- public void clearState()
- {
- isReady = false;
- isDone = false;
- }
-
- public long bytesReadForUnconsumedData()
+ else
{
- // In theory, hasNext() does consume 2-3 bytes, but we don't care about this for the current file format so returning
- // 0 to mean "do nothing".
- return 0;
+ builder.newRow(clusteringDeserializer.deserializeNextClustering());
+ return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder);
}
}
- public static class OldFormatDeserializer extends UnfilteredDeserializer
+ /**
+ * Clears any state in this deserializer.
+ */
+ public void clearState()
{
- private final boolean readAllAsDynamic;
- private boolean skipStatic;
-
- // The next Unfiltered to return, computed by hasNext()
- private Unfiltered next;
- // A temporary storage for an unfiltered that isn't returned next but should be looked at just afterwards
- private Unfiltered saved;
-
- private boolean isFirst = true;
-
- // The Unfiltered as read from the old format input
- private final UnfilteredIterator iterator;
-
- // The position in the input after the last data consumption (readNext/skipNext).
- private long lastConsumedPosition;
-
- private OldFormatDeserializer(CFMetaData metadata,
- DataInputPlus in,
- SerializationHelper helper,
- DeletionTime partitionDeletion,
- boolean readAllAsDynamic)
- {
- super(metadata, in, helper);
- this.iterator = new UnfilteredIterator(partitionDeletion);
- this.readAllAsDynamic = readAllAsDynamic;
- this.lastConsumedPosition = currentPosition();
- }
-
- public void setSkipStatic()
- {
- this.skipStatic = true;
- }
-
- private boolean isStatic(Unfiltered unfiltered)
- {
- return unfiltered.isRow() && ((Row)unfiltered).isStatic();
- }
-
- public boolean hasNext() throws IOException
- {
- try
- {
- while (next == null)
- {
- if (saved == null && !iterator.hasNext())
- return false;
-
- next = saved == null ? iterator.next() : saved;
- saved = null;
-
- // The sstable iterators assume that if there is one, the static row is the first thing this deserializer will return.
- // However, in the old format, a range tombstone with an empty start would sort before any static cell. So we should
- // detect that case and return the static parts first if necessary.
- if (isFirst && iterator.hasNext() && isStatic(iterator.peek()))
- {
- saved = next;
- next = iterator.next();
- }
- isFirst = false;
-
- // When reading old tables, we sometimes want to skip static data (due to how staticly defined column of compact
- // tables are handled).
- if (skipStatic && isStatic(next))
- next = null;
- }
- return true;
- }
- catch (IOError e)
- {
- if (e.getCause() != null && e.getCause() instanceof IOException)
- throw (IOException)e.getCause();
- throw e;
- }
- }
-
- private boolean isRow(LegacyLayout.LegacyAtom atom)
- {
- if (atom.isCell())
- return true;
-
- LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
- return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
- }
-
- public int compareNextTo(ClusteringBound bound) throws IOException
- {
- if (!hasNext())
- throw new IllegalStateException();
- return metadata.comparator.compare(next.clustering(), bound);
- }
-
- public boolean nextIsRow() throws IOException
- {
- if (!hasNext())
- throw new IllegalStateException();
- return next.isRow();
- }
-
- public boolean nextIsStatic() throws IOException
- {
- return nextIsRow() && ((Row)next).isStatic();
- }
-
- private long currentPosition()
- {
- // We return a bogus value if the input is not file based, but check we never rely
- // on that value in that case in bytesReadForUnconsumedData
- return in instanceof FileDataInput ? ((FileDataInput)in).getFilePointer() : 0;
- }
-
- public Unfiltered readNext() throws IOException
- {
- if (!hasNext())
- throw new IllegalStateException();
- Unfiltered toReturn = next;
- next = null;
- lastConsumedPosition = currentPosition();
- return toReturn;
- }
-
- public void skipNext() throws IOException
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- next = null;
- lastConsumedPosition = currentPosition();
- }
-
- public long bytesReadForUnconsumedData()
- {
- if (!(in instanceof FileDataInput))
- throw new AssertionError();
-
- return currentPosition() - lastConsumedPosition;
- }
-
- public void clearState()
- {
- next = null;
- saved = null;
- iterator.clearState();
- lastConsumedPosition = currentPosition();
- }
+ isReady = false;
+ isDone = false;
+ }
- // Groups atoms from the input into proper Unfiltered.
- // Note: this could use guava AbstractIterator except that we want to be able to clear
- // the internal state of the iterator so it's cleaner to do it ourselves.
- private class UnfilteredIterator implements PeekingIterator<Unfiltered>
+ /**
+ * Skips the next atom.
+ */
+ public void skipNext() throws IOException
+ {
+ isReady = false;
+ clusteringDeserializer.skipNext();
+ if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- private final AtomIterator atoms;
- private final LegacyLayout.CellGrouper grouper;
- private final TombstoneTracker tombstoneTracker;
-
- private Unfiltered next;
-
- private UnfilteredIterator(DeletionTime partitionDeletion)
- {
- this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
- this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
- this.atoms = new AtomIterator();
- }
-
- public boolean hasNext()
- {
- // Note that we loop on next == null because TombstoneTracker.openNew() could return null below or the atom might be shadowed.
- while (next == null)
- {
- if (atoms.hasNext())
- {
- // If a range tombstone closes strictly before the next row/RT, we need to return that close (or boundary) marker first.
- if (tombstoneTracker.hasClosingMarkerBefore(atoms.peek()))
- {
- next = tombstoneTracker.popClosingMarker();
- }
- else
- {
- LegacyLayout.LegacyAtom atom = atoms.next();
- if (!tombstoneTracker.isShadowed(atom))
- next = isRow(atom) ? readRow(atom) : tombstoneTracker.openNew(atom.asRangeTombstone());
- }
- }
- else if (tombstoneTracker.hasOpenTombstones())
- {
- next = tombstoneTracker.popClosingMarker();
- }
- else
- {
- return false;
- }
- }
- return true;
- }
-
- private Unfiltered readRow(LegacyLayout.LegacyAtom first)
- {
- LegacyLayout.CellGrouper grouper = first.isStatic()
- ? LegacyLayout.CellGrouper.staticGrouper(metadata, helper)
- : this.grouper;
- grouper.reset();
- grouper.addAtom(first);
- // As long as atoms are part of the same row, consume them. Note that the call to addAtom() uses
- // atoms.peek() so that the atom is only consumed (by next) if it's part of the row (addAtom returns true)
- while (atoms.hasNext() && grouper.addAtom(atoms.peek()))
- {
- atoms.next();
- }
- return grouper.getRow();
- }
-
- public Unfiltered next()
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- Unfiltered toReturn = next;
- next = null;
- return toReturn;
- }
-
- public Unfiltered peek()
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- return next;
- }
-
- public void clearState()
- {
- atoms.clearState();
- tombstoneTracker.clearState();
- next = null;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
+ UnfilteredSerializer.serializer.skipMarkerBody(in);
}
-
- // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
- // Note: this could use guava AbstractIterator except that we want to be able to clear
- // the internal state of the iterator so it's cleaner to do it ourselves.
- private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
- {
- private boolean isDone;
- private LegacyLayout.LegacyAtom next;
-
- private AtomIterator()
- {
- }
-
- public boolean hasNext()
- {
- if (isDone)
- return false;
-
- if (next == null)
- {
- next = readAtom();
- if (next == null)
- {
- isDone = true;
- return false;
- }
- }
- return true;
- }
-
- private LegacyLayout.LegacyAtom readAtom()
- {
- try
- {
- return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public LegacyLayout.LegacyAtom next()
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- LegacyLayout.LegacyAtom toReturn = next;
- next = null;
- return toReturn;
- }
-
- public LegacyLayout.LegacyAtom peek()
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- return next;
- }
-
- public void clearState()
- {
- this.next = null;
- this.isDone = false;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Tracks which range tombstones are open when deserializing the old format.
- */
- private class TombstoneTracker
+ else
{
- private final DeletionTime partitionDeletion;
-
- // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
- // As we only track non-fully-shadowed ranges, the first range is necessarily the currently
- // open tombstone (the one with the higher timestamp).
- private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
-
- public TombstoneTracker(DeletionTime partitionDeletion)
- {
- this.partitionDeletion = partitionDeletion;
- this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
- }
-
- /**
- * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
- */
- public boolean isShadowed(LegacyLayout.LegacyAtom atom)
- {
- assert !hasClosingMarkerBefore(atom);
- long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
-
- if (partitionDeletion.deletes(timestamp))
- return true;
-
- SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
- return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
- }
-
- /**
- * Whether the currently open marker closes stricly before the provided row/RT.
- */
- public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
- {
- return !openTombstones.isEmpty()
- && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
- }
-
- /**
- * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
- */
- public Unfiltered popClosingMarker()
- {
- assert !openTombstones.isEmpty();
-
- Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
- LegacyLayout.LegacyRangeTombstone first = iter.next();
- iter.remove();
-
- // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
- // next tombstone
- if (!iter.hasNext())
- return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
-
- LegacyLayout.LegacyRangeTombstone next = iter.next();
- return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
- }
-
- /**
- * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
- * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
- * or even null (if the new tombston start is supersedes by the currently open tombstone).
- *
- * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
- * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
- */
- public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
- {
- if (openTombstones.isEmpty())
- {
- openTombstones.add(tombstone);
- return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
- }
-
- Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
- LegacyLayout.LegacyRangeTombstone first = iter.next();
- if (tombstone.deletionTime.supersedes(first.deletionTime))
- {
- // We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open
- // one and open the new one. We should also add the tombstone, but if it stop after the first one, we should
- // also remove that first tombstone as it won't be useful anymore.
- if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0)
- iter.remove();
-
- openTombstones.add(tombstone);
- return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime);
- }
- else
- {
- // If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we
- // just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone
- // simply extend after the first one and we'll deal with it later)
- assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) > 0;
- openTombstones.add(tombstone);
- return null;
- }
- }
-
- public boolean hasOpenTombstones()
- {
- return !openTombstones.isEmpty();
- }
-
- public void clearState()
- {
- openTombstones.clear();
- }
+ UnfilteredSerializer.serializer.skipRowBody(in);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index f7e614a..d435832 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -89,12 +89,6 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
// - we're querying static columns.
boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty();
- // For CQL queries on static compact tables, we only want to consider static value (only those are exposed),
- // but readStaticRow have already read them and might in fact have consumed the whole partition (when reading
- // the legacy file format), so set the reader to null so we don't try to read anything more. We can remove this
- // once we drop support for the legacy file format
- boolean needsReader = sstable.descriptor.version.storeRows() || isForThrift || !sstable.metadata.isStaticCompactTable();
-
if (needSeekAtPartitionStart)
{
// Not indexed (or is reading static), set to the beginning of the partition and read partition level deletion there
@@ -108,14 +102,14 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
// Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow
// (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format).
- this.reader = needsReader ? createReader(indexEntry, file, shouldCloseFile) : null;
- this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer);
+ this.reader = createReader(indexEntry, file, shouldCloseFile);
+ this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader.deserializer);
}
else
{
this.partitionLevelDeletion = indexEntry.deletionTime();
this.staticRow = Rows.EMPTY_STATIC_ROW;
- this.reader = needsReader ? createReader(indexEntry, file, shouldCloseFile) : null;
+ this.reader = createReader(indexEntry, file, shouldCloseFile);
}
if (reader != null && !slices.isEmpty())
@@ -168,33 +162,6 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
boolean isForThrift,
UnfilteredDeserializer deserializer) throws IOException
{
- if (!sstable.descriptor.version.storeRows())
- {
- if (!sstable.metadata.isCompactTable())
- {
- assert deserializer != null;
- return deserializer.hasNext() && deserializer.nextIsStatic()
- ? (Row)deserializer.readNext()
- : Rows.EMPTY_STATIC_ROW;
- }
-
- // For compact tables, we use statics for the "column_metadata" definition. However, in the old format, those
- // "column_metadata" are intermingled as any other "cell". In theory, this means that we'd have to do a first
- // pass to extract the static values. However, for thrift, we'll use the ThriftResultsMerger right away which
- // will re-merge static values with dynamic ones, so we can just ignore static and read every cell as a
- // "dynamic" one. For CQL, if the table is a "static compact", then is has only static columns exposed and no
- // dynamic ones. So we do a pass to extract static columns here, but will have no more work to do. Otherwise,
- // the table won't have static columns.
- if (statics.isEmpty() || isForThrift)
- return Rows.EMPTY_STATIC_ROW;
-
- assert sstable.metadata.isStaticCompactTable();
-
- // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the
- // static ones. So we don't have to mark the position to seek back later.
- return LegacyLayout.extractStaticColumns(sstable.metadata, file, statics);
- }
-
if (!sstable.header.hasStatic())
return Rows.EMPTY_STATIC_ROW;
@@ -345,7 +312,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
private void createDeserializer()
{
assert file != null && deserializer == null;
- deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion, isForThrift);
+ deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper);
}
protected void seekToPosition(long position) throws IOException
@@ -550,8 +517,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
public boolean isPastCurrentBlock() throws IOException
{
assert reader.deserializer != null;
- long correction = reader.deserializer.bytesReadForUnconsumedData();
- return reader.file.bytesPastMark(mark) - correction >= currentIndex().width;
+ return reader.file.bytesPastMark(mark) >= currentIndex().width;
}
public int currentBlockIdx()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index b3c2e94..aa0a390 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -257,12 +257,10 @@ public class SSTableIterator extends AbstractSSTableIterator
// so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the
// whole slice is between the previous block end and this block start, and thus has no corresponding
// data. One exception is if the previous block ends with an openMarker as it will cover our slice
- // and we need to return it (we also don't skip the slice for the old format because we didn't have the openMarker
- // info in that case and can't rely on this optimization).
+ // and we need to return it.
if (indexState.currentBlockIdx() == lastBlockIdx
&& metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0
- && openMarker == null
- && sstable.descriptor.version.storeRows())
+ && openMarker == null)
{
sliceDone = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index c74b5db..ca0cce2 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -310,23 +310,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
int currentBlock = indexState.currentBlockIdx();
boolean canIncludeSliceStart = currentBlock == lastBlockIdx;
-
- // When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can
- // start at the end of a block and end at the beginning of the next one. That's not a problem per se for
- // UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index
- // blocks, but as we reading index block in reverse we must be careful to not read the end of the row at
- // beginning of a block before we're reading the beginning of that row. So what we do is that if we detect
- // that the row starting this block is also the row ending the previous one, we skip that first result and
- // let it be read when we'll read the previous block.
- boolean includeFirst = true;
- if (!sstable.descriptor.version.storeRows() && currentBlock > 0)
- {
- ClusteringPrefix lastOfPrevious = indexState.index(currentBlock - 1).lastName;
- ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName;
- includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent) != 0;
- }
-
- loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, includeFirst);
+ loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index a30ca0e..c7c5971 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -233,7 +233,7 @@ public class CommitLogArchiver
throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName))
throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
- else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
+ else if (fromName != null && fromHeader == null)
throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
else if (fromHeader != null)
descriptor = fromHeader;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 088d44a..0ab191d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -57,10 +57,7 @@ public class CommitLogDescriptor
static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
static final String COMPRESSION_CLASS_KEY = "compressionClass";
- public static final int VERSION_12 = 2;
- public static final int VERSION_20 = 3;
- public static final int VERSION_21 = 4;
- public static final int VERSION_22 = 5;
+ // We don't support anything pre-3.0
public static final int VERSION_30 = 6;
/**
@@ -104,20 +101,15 @@ public class CommitLogDescriptor
out.putLong(descriptor.id);
updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
updateChecksumInt(crc, (int) (descriptor.id >>> 32));
- if (descriptor.version >= VERSION_22)
- {
- String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
- byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
- if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
- throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
- parametersBytes.length));
- out.putShort((short) parametersBytes.length);
- updateChecksumInt(crc, parametersBytes.length);
- out.put(parametersBytes);
- crc.update(parametersBytes, 0, parametersBytes.length);
- }
- else
- assert descriptor.compression == null;
+ String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
+ byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
+ if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
+ throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
+ parametersBytes.length));
+ out.putShort((short) parametersBytes.length);
+ updateChecksumInt(crc, parametersBytes.length);
+ out.put(parametersBytes);
+ crc.update(parametersBytes, 0, parametersBytes.length);
out.putInt((int) crc.getValue());
}
@@ -157,16 +149,15 @@ public class CommitLogDescriptor
{
CRC32 checkcrc = new CRC32();
int version = input.readInt();
+ if (version < VERSION_30)
+ throw new IllegalArgumentException("Unsupported pre-3.0 commit log found; cannot read.");
+
updateChecksumInt(checkcrc, version);
long id = input.readLong();
updateChecksumInt(checkcrc, (int) (id & 0xFFFFFFFFL));
updateChecksumInt(checkcrc, (int) (id >>> 32));
- int parametersLength = 0;
- if (version >= VERSION_22)
- {
- parametersLength = input.readShort() & 0xFFFF;
- updateChecksumInt(checkcrc, parametersLength);
- }
+ int parametersLength = input.readShort() & 0xFFFF;
+ updateChecksumInt(checkcrc, parametersLength);
// This should always succeed as parametersLength cannot be too long even for a
// corrupt segment file.
byte[] parametersBytes = new byte[parametersLength];
@@ -213,14 +204,6 @@ public class CommitLogDescriptor
{
switch (version)
{
- case VERSION_12:
- return MessagingService.VERSION_12;
- case VERSION_20:
- return MessagingService.VERSION_20;
- case VERSION_21:
- return MessagingService.VERSION_21;
- case VERSION_22:
- return MessagingService.VERSION_22;
case VERSION_30:
return MessagingService.VERSION_30;
default:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a..eb745c7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -122,19 +122,6 @@ public class CommitLogReader
try(RandomAccessReader reader = RandomAccessReader.open(file))
{
- if (desc.version < CommitLogDescriptor.VERSION_21)
- {
- if (!shouldSkipSegmentId(file, desc, minPosition))
- {
- if (minPosition.segmentId == desc.id)
- reader.seek(minPosition.position);
- ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
- statusTracker.errorContext = desc.fileName();
- readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
- }
- return;
- }
-
final long segmentIdFromFilename = desc.id;
try
{
@@ -430,42 +417,17 @@ public class CommitLogReader
{
public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
{
- switch (commitLogVersion)
- {
- case CommitLogDescriptor.VERSION_12:
- case CommitLogDescriptor.VERSION_20:
- return input.readLong();
- // Changed format in 2.1
- default:
- return input.readInt() & 0xffffffffL;
- }
+ return input.readInt() & 0xffffffffL;
}
public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
{
- switch (commitLogVersion)
- {
- case CommitLogDescriptor.VERSION_12:
- checksum.update(serializedSize);
- break;
- // Changed format in 2.0
- default:
- updateChecksumInt(checksum, serializedSize);
- break;
- }
+ updateChecksumInt(checksum, serializedSize);
}
public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
{
- switch (commitLogVersion)
- {
- case CommitLogDescriptor.VERSION_12:
- case CommitLogDescriptor.VERSION_20:
- return input.readLong();
- // Changed format in 2.1
- default:
- return input.readInt() & 0xffffffffL;
- }
+ return input.readInt() & 0xffffffffL;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a22cda5..78650f1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1242,7 +1242,7 @@ public class CompactionManager implements CompactionManagerMBean
header = SerializationHeader.make(sstable.metadata, Collections.singleton(sstable));
return SSTableWriter.create(cfs.metadata,
- Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
+ cfs.newSSTableDescriptor(compactionFileLocation),
expectedBloomFilterSize,
repairedAt,
sstable.getSSTableLevel(),
@@ -1274,7 +1274,7 @@ public class CompactionManager implements CompactionManagerMBean
break;
}
}
- return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
+ return SSTableWriter.create(cfs.newSSTableDescriptor(compactionFileLocation),
(long) expectedBloomFilterSize,
repairedAt,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 7a5b719..aedb208 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -70,7 +70,7 @@ public class Upgrader
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
- return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)),
+ return SSTableWriter.create(cfs.newSSTableDescriptor(directory),
estimatedRows,
repairedAt,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index df659e4..a52dd82 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -97,8 +97,7 @@ public class Verifier implements Closeable
{
validator = null;
- if (sstable.descriptor.digestComponent != null &&
- new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent)).exists())
+ if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
{
validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
validator.validate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index f8ecd87..d279321 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -69,7 +69,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
public void switchCompactionLocation(Directories.DataDirectory directory)
{
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
+ SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)),
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 0beb505..a3d8c98 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -105,7 +105,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
{
this.sstableDirectory = location;
averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
- sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
+ sstableWriter.switchWriter(SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
keysPerSSTable,
minRepairedAt,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 864185e..7acb870 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -108,7 +108,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
{
sstableDirectory = location;
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
+ SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 46cb891..a01672e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -104,7 +104,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
this.location = location;
long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+ SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(location)),
currentPartitionsToWrite,
minRepairedAt,
cfs.metadata,