You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:50:01 UTC
[08/11] cassandra git commit: Remove pre-3.0 compatibility code for
4.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4c0608f..5baf783 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -509,16 +509,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
{
public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
{
- if (version >= MessagingService.VERSION_30)
- out.writeByte(expression.kind().ordinal());
+ out.writeByte(expression.kind().ordinal());
// Custom expressions include neither a column or operator, but all
- // other expressions do. Also, custom expressions are 3.0+ only, so
- // the column & operator will always be the first things written for
- // any pre-3.0 version
+ // other expressions do.
if (expression.kind() == Kind.CUSTOM)
{
- assert version >= MessagingService.VERSION_30;
IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version);
ByteBufferUtil.writeWithShortLength(expression.value, out);
return;
@@ -526,7 +522,6 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
if (expression.kind() == Kind.USER)
{
- assert version >= MessagingService.VERSION_30;
UserExpression.serialize((UserExpression)expression, out, version);
return;
}
@@ -541,15 +536,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
break;
case MAP_EQUALITY:
MapEqualityExpression mexpr = (MapEqualityExpression)expression;
- if (version < MessagingService.VERSION_30)
- {
- ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
- }
- else
- {
- ByteBufferUtil.writeWithShortLength(mexpr.key, out);
- ByteBufferUtil.writeWithShortLength(mexpr.value, out);
- }
+ ByteBufferUtil.writeWithShortLength(mexpr.key, out);
+ ByteBufferUtil.writeWithShortLength(mexpr.value, out);
break;
case THRIFT_DYN_EXPR:
ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
@@ -559,62 +547,33 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
- Kind kind = null;
- ByteBuffer name;
- Operator operator;
- ColumnDefinition column;
+ Kind kind = Kind.values()[in.readByte()];
- if (version >= MessagingService.VERSION_30)
+ // custom expressions (3.0+ only) do not contain a column or operator, only a value
+ if (kind == Kind.CUSTOM)
{
- kind = Kind.values()[in.readByte()];
- // custom expressions (3.0+ only) do not contain a column or operator, only a value
- if (kind == Kind.CUSTOM)
- {
- return new CustomExpression(metadata,
- IndexMetadata.serializer.deserialize(in, version, metadata),
- ByteBufferUtil.readWithShortLength(in));
- }
-
- if (kind == Kind.USER)
- {
- return UserExpression.deserialize(in, version, metadata);
- }
+ return new CustomExpression(metadata,
+ IndexMetadata.serializer.deserialize(in, version, metadata),
+ ByteBufferUtil.readWithShortLength(in));
}
- name = ByteBufferUtil.readWithShortLength(in);
- operator = Operator.readFrom(in);
- column = metadata.getColumnDefinition(name);
+ if (kind == Kind.USER)
+ return UserExpression.deserialize(in, version, metadata);
+
+ ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+ Operator operator = Operator.readFrom(in);
+ ColumnDefinition column = metadata.getColumnDefinition(name);
+
if (!metadata.isCompactTable() && column == null)
throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");
- if (version < MessagingService.VERSION_30)
- {
- if (column == null)
- kind = Kind.THRIFT_DYN_EXPR;
- else if (column.type instanceof MapType && operator == Operator.EQ)
- kind = Kind.MAP_EQUALITY;
- else
- kind = Kind.SIMPLE;
- }
-
- assert kind != null;
switch (kind)
{
case SIMPLE:
return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
case MAP_EQUALITY:
- ByteBuffer key, value;
- if (version < MessagingService.VERSION_30)
- {
- ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
- key = CompositeType.extractComponent(composite, 0);
- value = CompositeType.extractComponent(composite, 0);
- }
- else
- {
- key = ByteBufferUtil.readWithShortLength(in);
- value = ByteBufferUtil.readWithShortLength(in);
- }
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ ByteBuffer value = ByteBufferUtil.readWithShortLength(in);
return new MapEqualityExpression(column, key, operator, value);
case THRIFT_DYN_EXPR:
return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
@@ -622,16 +581,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
throw new AssertionError();
}
-
public long serializedSize(Expression expression, int version)
{
- // version 3.0+ includes a byte for Kind
- long size = version >= MessagingService.VERSION_30 ? 1 : 0;
+ long size = 1; // kind byte
// Custom expressions include neither a column or operator, but all
- // other expressions do. Also, custom expressions are 3.0+ only, so
- // the column & operator will always be the first things written for
- // any pre-3.0 version
+ // other expressions do.
if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER)
size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
+ expression.operator.serializedSize();
@@ -643,23 +598,19 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
break;
case MAP_EQUALITY:
MapEqualityExpression mexpr = (MapEqualityExpression)expression;
- if (version < MessagingService.VERSION_30)
- size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue());
- else
- size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
- + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
+ size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
+ + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
break;
case THRIFT_DYN_EXPR:
size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value);
break;
case CUSTOM:
- if (version >= MessagingService.VERSION_30)
- size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
- + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
+ size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
+ + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
break;
case USER:
- if (version >= MessagingService.VERSION_30)
- size += UserExpression.serializedSize((UserExpression)expression, version);
+ size += UserExpression.serializedSize((UserExpression)expression, version);
+ break;
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index b95a310..1ed961f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -238,12 +238,10 @@ public class PartitionUpdate extends AbstractBTreePartition
*
* @param bytes the byte buffer that contains the serialized update.
* @param version the version with which the update is serialized.
- * @param key the partition key for the update. This is only used if {@code version < 3.0}
- * and can be {@code null} otherwise.
*
* @return the deserialized update or {@code null} if {@code bytes == null}.
*/
- public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key)
+ public static PartitionUpdate fromBytes(ByteBuffer bytes, int version)
{
if (bytes == null)
return null;
@@ -252,8 +250,7 @@ public class PartitionUpdate extends AbstractBTreePartition
{
return serializer.deserialize(new DataInputBuffer(bytes, true),
version,
- SerializationHelper.Flag.LOCAL,
- version < MessagingService.VERSION_30 ? key : null);
+ SerializationHelper.Flag.LOCAL);
}
catch (IOException e)
{
@@ -780,47 +777,12 @@ public class PartitionUpdate extends AbstractBTreePartition
{
assert !iter.isReverseOrder();
- if (version < MessagingService.VERSION_30)
- {
- LegacyLayout.serializeAsLegacyPartition(null, iter, out, version);
- }
- else
- {
- CFMetaData.serializer.serialize(update.metadata(), out, version);
- UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount());
- }
+ CFMetaData.serializer.serialize(update.metadata(), out, version);
+ UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount());
}
}
- public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
- {
- if (version >= MessagingService.VERSION_30)
- {
- assert key == null; // key is only there for the old format
- return deserialize30(in, version, flag);
- }
- else
- {
- assert key != null;
- return deserializePre30(in, version, flag, key);
- }
- }
-
- // Used to share same decorated key between updates.
- public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
- {
- if (version >= MessagingService.VERSION_30)
- {
- return deserialize30(in, version, flag);
- }
- else
- {
- assert key != null;
- return deserializePre30(in, version, flag, key.getKey());
- }
- }
-
- private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag);
@@ -854,22 +816,10 @@ public class PartitionUpdate extends AbstractBTreePartition
false);
}
- private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
- {
- try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
- {
- assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
- return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata()));
- }
- }
-
public long serializedSize(PartitionUpdate update, int version)
{
try (UnfilteredRowIterator iter = update.unfilteredIterator())
{
- if (version < MessagingService.VERSION_30)
- return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version);
-
return CFMetaData.serializer.serializedSize(update.metadata(), version)
+ UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 852d95e..bcc8d4d 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -252,13 +252,11 @@ public abstract class UnfilteredPartitionIterators
/**
* Digests the the provided iterator.
*
- * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
- * as this is only used when producing digest to be sent to legacy nodes.
* @param iterator the iterator to digest.
* @param digest the {@code MessageDigest} to use for the digest.
* @param version the messaging protocol to use when producing the digest.
*/
- public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
+ public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
{
try (UnfilteredPartitionIterator iter = iterator)
{
@@ -266,7 +264,7 @@ public abstract class UnfilteredPartitionIterators
{
try (UnfilteredRowIterator partition = iter.next())
{
- UnfilteredRowIterators.digest(command, partition, digest, version);
+ UnfilteredRowIterators.digest(partition, digest, version);
}
}
}
@@ -303,8 +301,6 @@ public abstract class UnfilteredPartitionIterators
{
public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
{
- assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
-
out.writeBoolean(iter.isForThrift());
while (iter.hasNext())
{
@@ -319,7 +315,6 @@ public abstract class UnfilteredPartitionIterators
public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
{
- assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
final boolean isForThrift = in.readBoolean();
return new AbstractUnfilteredPartitionIterator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 14730ac..30a0a37 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -211,12 +211,11 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
/**
* @return true if we can use the clustering values in the stats of the sstable:
- * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size)
- * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
+ * we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
*/
private boolean canUseMetadataLowerBound()
{
- return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile();
+ return !sstable.hasTombstones();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 46447ec..004783e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -143,20 +143,12 @@ public abstract class UnfilteredRowIterators
/**
* Digests the partition represented by the provided iterator.
*
- * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
- * as this is only used when producing digest to be sent to legacy nodes.
* @param iterator the iterator to digest.
* @param digest the {@code MessageDigest} to use for the digest.
* @param version the messaging protocol to use when producing the digest.
*/
- public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
+ public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version)
{
- if (version < MessagingService.VERSION_30)
- {
- LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
- return;
- }
-
digest.update(iterator.partitionKey().getKey().duplicate());
iterator.partitionLevelDeletion().digest(digest);
iterator.columns().regulars.digest(digest);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index 298c316..7a603b0 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -184,6 +184,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
* The first int tells us if it's a range or bounds (depending on the value) _and_ if it's tokens or keys (depending on the
* sign). We use negative kind for keys so as to preserve the serialization of token from older version.
*/
+ // !WARNING! While we don't support the pre-3.0 messaging protocol, we serialize the token range in the
+ // system table (see SystemKeypsace.rangeToBytes) using the old/pre-3.0 format and until we deal with that
+ // problem, we have to preserve this code.
if (version < MessagingService.VERSION_30)
out.writeInt(kindInt(range));
else
@@ -195,6 +198,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
public AbstractBounds<T> deserialize(DataInput in, IPartitioner p, int version) throws IOException
{
boolean isToken, startInclusive, endInclusive;
+ // !WARNING! See serialize method above for why we still need to have that condition.
if (version < MessagingService.VERSION_30)
{
int kind = in.readInt();
@@ -226,6 +230,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
public long serializedSize(AbstractBounds<T> ab, int version)
{
+ // !WARNING! See serialize method above for why we still need to have that condition.
int size = version < MessagingService.VERSION_30
? TypeSizes.sizeof(kindInt(ab))
: 1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 212f88c..c4c3872 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -979,12 +979,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private void markAlive(final InetAddress addr, final EndpointState localState)
{
- if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20)
- {
- realMarkAlive(addr, localState);
- return;
- }
-
localState.markDead();
MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
deleted file mode 100644
index 50d8b6e..0000000
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hints;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format.
- */
-@SuppressWarnings("deprecation")
-public final class LegacyHintsMigrator
-{
- private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class);
-
- private final File hintsDirectory;
- private final long maxHintsFileSize;
-
- private final ColumnFamilyStore legacyHintsTable;
- private final int pageSize;
-
- public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize)
- {
- this.hintsDirectory = hintsDirectory;
- this.maxHintsFileSize = maxHintsFileSize;
-
- legacyHintsTable = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
- pageSize = calculatePageSize(legacyHintsTable);
- }
-
- // read fewer columns (mutations) per page if they are very large
- private static int calculatePageSize(ColumnFamilyStore legacyHintsTable)
- {
- int size = 128;
-
- int meanCellCount = legacyHintsTable.getMeanColumns();
- double meanPartitionSize = legacyHintsTable.getMeanPartitionSize();
-
- if (meanCellCount != 0 && meanPartitionSize != 0)
- {
- int avgHintSize = (int) meanPartitionSize / meanCellCount;
- size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize));
- }
-
- return size;
- }
-
- public void migrate()
- {
- // nothing to migrate
- if (legacyHintsTable.isEmpty())
- return;
- logger.info("Migrating legacy hints to new storage");
-
- // major-compact all of the existing sstables to get rid of the tombstones + expired hints
- logger.info("Forcing a major compaction of {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
- compactLegacyHints();
-
- // paginate over legacy hints and write them to the new storage
- logger.info("Writing legacy hints to the new storage");
- migrateLegacyHints();
-
- // truncate the legacy hints table
- logger.info("Truncating {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
- legacyHintsTable.truncateBlocking();
- }
-
- private void compactLegacyHints()
- {
- Collection<Descriptor> descriptors = new ArrayList<>();
- legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor));
- if (!descriptors.isEmpty())
- forceCompaction(descriptors);
- }
-
- private void forceCompaction(Collection<Descriptor> descriptors)
- {
- try
- {
- CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get();
- }
- catch (InterruptedException | ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private void migrateLegacyHints()
- {
- ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
- String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
- //noinspection ConstantConditions
- QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer));
- FileUtils.clean(buffer);
- }
-
- private void migrateLegacyHints(UUID hostId, ByteBuffer buffer)
- {
- String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " +
- "FROM %s.%s " +
- "WHERE target_id = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_HINTS);
-
- // read all the old hints (paged iterator), write them in the new format
- UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId);
- migrateLegacyHints(hostId, rows, buffer);
-
- // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows
- // to not lose progress in case of a terminated conversion
- deleteLegacyHintsPartition(hostId);
- }
-
- private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer)
- {
- migrateLegacyHints(hostId, rows.iterator(), buffer);
- }
-
- private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
- {
- do
- {
- migrateLegacyHintsInternal(hostId, iterator, buffer);
- // if there are hints that didn't fit in the previous file, keep calling the method to write to a new
- // file until we get everything written.
- }
- while (iterator.hasNext());
- }
-
- private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
- {
- HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis());
-
- try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor))
- {
- try (HintsWriter.Session session = writer.newSession(buffer))
- {
- while (iterator.hasNext())
- {
- Hint hint = convertLegacyHint(iterator.next());
- if (hint != null)
- session.append(hint);
-
- if (session.position() >= maxHintsFileSize)
- break;
- }
- }
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, descriptor.fileName());
- }
- }
-
- private static Hint convertLegacyHint(UntypedResultSet.Row row)
- {
- Mutation mutation = deserializeLegacyMutation(row);
- if (mutation == null)
- return null;
-
- long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table
- int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl");
- int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime);
-
- int gcgs = Math.min(originalGCGS, mutation.smallestGCGS());
-
- return Hint.create(mutation, creationTime, gcgs);
- }
-
- private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
- {
- try (DataInputBuffer dib = new DataInputBuffer(row.getBlob("mutation"), true))
- {
- Mutation mutation = Mutation.serializer.deserialize(dib,
- row.getInt("message_version"));
- mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
- return mutation;
- }
- catch (IOException e)
- {
- logger.error("Failed to migrate a hint for {} from legacy {}.{} table",
- row.getUUID("target_id"),
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_HINTS,
- e);
- return null;
- }
- catch (MarshalException e)
- {
- logger.warn("Failed to validate a hint for {} from legacy {}.{} table - skipping",
- row.getUUID("target_id"),
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_HINTS,
- e);
- return null;
- }
- }
-
- private static void deleteLegacyHintsPartition(UUID hostId)
- {
- // intentionally use millis, like the rest of the legacy implementation did, just in case
- Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints,
- UUIDType.instance.decompose(hostId),
- System.currentTimeMillis(),
- FBUtilities.nowInSeconds()));
- mutation.applyUnsafe();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
deleted file mode 100644
index 64f91d7..0000000
--- a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-/**
- * A serializer which forwards all its method calls to another serializer. Subclasses should override one or more
- * methods to modify the behavior of the backing serializer as desired per the decorator pattern.
- */
-public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T>
-{
- protected ForwardingVersionedSerializer()
- {
- }
-
- /**
- * Returns the backing delegate instance that methods are forwarded to.
- *
- * @param version the server version
- * @return the backing delegate instance that methods are forwarded to.
- */
- protected abstract IVersionedSerializer<T> delegate(int version);
-
- public void serialize(T t, DataOutputPlus out, int version) throws IOException
- {
- delegate(version).serialize(t, out, version);
- }
-
- public T deserialize(DataInputPlus in, int version) throws IOException
- {
- return delegate(version).deserialize(in, version);
- }
-
- public long serializedSize(T t, int version)
- {
- return delegate(version).serializedSize(t, version);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9a4d919..7d98570 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -71,7 +71,6 @@ public class CompressionMetadata
private final long chunkOffsetsSize;
public final String indexFilePath;
public final CompressionParams parameters;
- public final ChecksumType checksumType;
/**
* Create metadata about given compressed file including uncompressed data length, chunk size
@@ -87,14 +86,13 @@ public class CompressionMetadata
public static CompressionMetadata create(String dataFilePath)
{
Descriptor desc = Descriptor.fromFilename(dataFilePath);
- return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.compressedChecksumType());
+ return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
}
@VisibleForTesting
- public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
+ public CompressionMetadata(String indexFilePath, long compressedLength)
{
this.indexFilePath = indexFilePath;
- this.checksumType = checksumType;
try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
{
@@ -133,7 +131,7 @@ public class CompressionMetadata
this.chunkOffsetsSize = chunkOffsets.size();
}
- private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, ChecksumType checksumType)
+ private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength)
{
this.indexFilePath = filePath;
this.parameters = parameters;
@@ -141,7 +139,6 @@ public class CompressionMetadata
this.compressedFileLength = compressedLength;
this.chunkOffsets = offsets;
this.chunkOffsetsSize = offsetsSize;
- this.checksumType = checksumType;
}
public ICompressor compressor()
@@ -417,7 +414,7 @@ public class CompressionMetadata
if (count < this.count)
compressedLength = offsets.getLong(count * 8L);
- return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, ChecksumType.CRC32);
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 9a8f968..7efca63 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.FilenameFilter;
+import java.io.FileFilter;
import java.io.IOException;
import java.io.Closeable;
import java.nio.ByteBuffer;
@@ -90,12 +90,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
private static int getNextGeneration(File directory, final String columnFamily)
{
final Set<Descriptor> existing = new HashSet<>();
- directory.list(new FilenameFilter()
+ directory.listFiles(new FileFilter()
{
- public boolean accept(File dir, String name)
+ public boolean accept(File file)
{
- Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
- Descriptor desc = p == null ? null : p.left;
+ Descriptor desc = SSTable.tryDescriptorFromFilename(file);
if (desc == null)
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 38152af..469a25c 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -50,8 +50,8 @@ public class Component
COMPRESSION_INFO("CompressionInfo.db"),
// statistical metadata about the content of the sstable
STATS("Statistics.db"),
- // holds adler32 checksum of the data file
- DIGEST("Digest.crc32", "Digest.adler32", "Digest.sha1"),
+ // holds CRC32 checksum of the data file
+ DIGEST("Digest.crc32"),
// holds the CRC32 for chunks in an a uncompressed file.
CRC("CRC.db"),
// holds SSTable Index Summary (sampling of Index component)
@@ -61,16 +61,11 @@ public class Component
// built-in secondary index (may be multiple per sstable)
SECONDARY_INDEX("SI_.*.db"),
// custom component, used by e.g. custom compaction strategy
- CUSTOM(new String[] { null });
+ CUSTOM(null);
- final String[] repr;
+ final String repr;
Type(String repr)
{
- this(new String[] { repr });
- }
-
- Type(String... repr)
- {
this.repr = repr;
}
@@ -78,9 +73,7 @@ public class Component
{
for (Type type : TYPES)
{
- if (type.repr == null || type.repr.length == 0 || type.repr[0] == null)
- continue;
- if (Pattern.matches(type.repr[0], repr))
+ if (type.repr != null && Pattern.matches(type.repr, repr))
return type;
}
return CUSTOM;
@@ -93,36 +86,18 @@ public class Component
public final static Component FILTER = new Component(Type.FILTER);
public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
public final static Component STATS = new Component(Type.STATS);
- private static final String digestCrc32 = "Digest.crc32";
- private static final String digestAdler32 = "Digest.adler32";
- private static final String digestSha1 = "Digest.sha1";
- public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, digestCrc32);
- public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, digestAdler32);
- public final static Component DIGEST_SHA1 = new Component(Type.DIGEST, digestSha1);
+ public final static Component DIGEST = new Component(Type.DIGEST);
public final static Component CRC = new Component(Type.CRC);
public final static Component SUMMARY = new Component(Type.SUMMARY);
public final static Component TOC = new Component(Type.TOC);
- public static Component digestFor(ChecksumType checksumType)
- {
- switch (checksumType)
- {
- case Adler32:
- return DIGEST_ADLER32;
- case CRC32:
- return DIGEST_CRC32;
- }
- throw new AssertionError();
- }
-
public final Type type;
public final String name;
public final int hashCode;
public Component(Type type)
{
- this(type, type.repr[0]);
- assert type.repr.length == 1;
+ this(type, type.repr);
assert type != Type.CUSTOM;
}
@@ -143,45 +118,32 @@ public class Component
}
/**
- * {@code
- * Filename of the form "<ksname>/<cfname>-[tmp-][<version>-]<gen>-<component>",
- * }
- * @return A Descriptor for the SSTable, and a Component for this particular file.
- * TODO move descriptor into Component field
+ * Parse the component part of a sstable filename into a {@code Component} object.
+ *
+ * @param name a string representing a sstable component.
+ * @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized
+ * name is parsed into a CUSTOM component.
*/
- public static Pair<Descriptor,Component> fromFilename(File directory, String name)
+ static Component parse(String name)
{
- Pair<Descriptor,String> path = Descriptor.fromFilename(directory, name);
+ Type type = Type.fromRepresentation(name);
- // parse the component suffix
- Type type = Type.fromRepresentation(path.right);
- // build (or retrieve singleton for) the component object
- Component component;
- switch(type)
+ // Build (or retrieve singleton for) the component object
+ switch (type)
{
- case DATA: component = Component.DATA; break;
- case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break;
- case FILTER: component = Component.FILTER; break;
- case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break;
- case STATS: component = Component.STATS; break;
- case DIGEST: switch (path.right)
- {
- case digestCrc32: component = Component.DIGEST_CRC32; break;
- case digestAdler32: component = Component.DIGEST_ADLER32; break;
- case digestSha1: component = Component.DIGEST_SHA1; break;
- default: throw new IllegalArgumentException("Invalid digest component " + path.right);
- }
- break;
- case CRC: component = Component.CRC; break;
- case SUMMARY: component = Component.SUMMARY; break;
- case TOC: component = Component.TOC; break;
- case SECONDARY_INDEX: component = new Component(Type.SECONDARY_INDEX, path.right); break;
- case CUSTOM: component = new Component(Type.CUSTOM, path.right); break;
- default:
- throw new IllegalStateException();
+ case DATA: return Component.DATA;
+ case PRIMARY_INDEX: return Component.PRIMARY_INDEX;
+ case FILTER: return Component.FILTER;
+ case COMPRESSION_INFO: return Component.COMPRESSION_INFO;
+ case STATS: return Component.STATS;
+ case DIGEST: return Component.DIGEST;
+ case CRC: return Component.CRC;
+ case SUMMARY: return Component.SUMMARY;
+ case TOC: return Component.TOC;
+ case SECONDARY_INDEX: return new Component(Type.SECONDARY_INDEX, name);
+ case CUSTOM: return new Component(Type.CUSTOM, name);
+ default: throw new AssertionError();
}
-
- return Pair.create(path.left, component);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 1f7e67f..3804fd8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -26,12 +26,12 @@ import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
-import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
import org.apache.cassandra.utils.Pair;
@@ -46,8 +46,13 @@ import static org.apache.cassandra.io.sstable.Component.separator;
*/
public class Descriptor
{
+ private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$";
+ private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR);
+
public static String TMP_EXT = ".tmp";
+ private static final Splitter filenameSplitter = Splitter.on('-');
+
/** canonicalized path to the directory where SSTable resides */
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
@@ -56,8 +61,6 @@ public class Descriptor
public final String cfname;
public final int generation;
public final SSTableFormat.Type formatType;
- /** digest component - might be {@code null} for old, legacy sstables */
- public final Component digestComponent;
private final int hashCode;
/**
@@ -66,7 +69,7 @@ public class Descriptor
@VisibleForTesting
public Descriptor(File directory, String ksname, String cfname, int generation)
{
- this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current(), null);
+ this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current());
}
/**
@@ -74,16 +77,10 @@ public class Descriptor
*/
public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
- this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
- }
-
- @VisibleForTesting
- public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
- {
- this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
+ this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType);
}
- public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)
+ public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
this.version = version;
@@ -99,24 +96,18 @@ public class Descriptor
this.cfname = cfname;
this.generation = generation;
this.formatType = formatType;
- this.digestComponent = digestComponent;
hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType);
}
public Descriptor withGeneration(int newGeneration)
{
- return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType, digestComponent);
+ return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType);
}
public Descriptor withFormatType(SSTableFormat.Type newType)
{
- return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType, digestComponent);
- }
-
- public Descriptor withDigestComponent(Component newDigestComponent)
- {
- return new Descriptor(version, directory, ksname, cfname, generation, formatType, newDigestComponent);
+ return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType);
}
public String tmpFilenameFor(Component component)
@@ -139,15 +130,9 @@ public class Descriptor
private void appendFileName(StringBuilder buff)
{
- if (!version.hasNewFileName())
- {
- buff.append(ksname).append(separator);
- buff.append(cfname).append(separator);
- }
buff.append(version).append(separator);
buff.append(generation);
- if (formatType != SSTableFormat.Type.LEGACY)
- buff.append(separator).append(formatType.name);
+ buff.append(separator).append(formatType.name);
}
public String relativeFilenameFor(Component component)
@@ -176,155 +161,156 @@ public class Descriptor
return ret;
}
- /**
- * Files obsoleted by CASSANDRA-7066 : temporary files and compactions_in_progress. We support
- * versions 2.1 (ka) and 2.2 (la).
- * Temporary files have tmp- or tmplink- at the beginning for 2.2 sstables or after ks-cf- for 2.1 sstables
- */
-
- private final static String LEGACY_COMP_IN_PROG_REGEX_STR = "^compactions_in_progress(\\-[\\d,a-f]{32})?$";
- private final static Pattern LEGACY_COMP_IN_PROG_REGEX = Pattern.compile(LEGACY_COMP_IN_PROG_REGEX_STR);
- private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$";
- private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR);
-
- public static boolean isLegacyFile(File file)
+ public static boolean isValidFile(File file)
{
- if (file.isDirectory())
- return file.getParentFile() != null &&
- file.getParentFile().getName().equalsIgnoreCase("system") &&
- LEGACY_COMP_IN_PROG_REGEX.matcher(file.getName()).matches();
- else
- return LEGACY_TMP_REGEX.matcher(file.getName()).matches();
- }
-
- public static boolean isValidFile(String fileName)
- {
- return fileName.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(fileName).matches();
+ String filename = file.getName();
+ return filename.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(filename).matches();
}
/**
- * @see #fromFilename(File directory, String name)
- * @param filename The SSTable filename
- * @return Descriptor of the SSTable initialized from filename
+ * Parse a sstable filename into a Descriptor.
+ * <p>
+ * This is a shortcut for {@code fromFilename(new File(filename))}.
+ *
+ * @param filename the filename to a sstable component.
+ * @return the descriptor for the parsed file.
+ *
+ * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+ * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+ * versions.
*/
public static Descriptor fromFilename(String filename)
{
- return fromFilename(filename, false);
- }
-
- public static Descriptor fromFilename(String filename, SSTableFormat.Type formatType)
- {
- return fromFilename(filename).withFormatType(formatType);
+ return fromFilename(new File(filename));
}
- public static Descriptor fromFilename(String filename, boolean skipComponent)
- {
- File file = new File(filename).getAbsoluteFile();
- return fromFilename(file.getParentFile(), file.getName(), skipComponent).left;
- }
-
- public static Pair<Descriptor, String> fromFilename(File directory, String name)
+ /**
+ * Parse a sstable filename into a Descriptor.
+ * <p>
+ * SSTables files are all located within subdirectories of the form {@code <keyspace>/<table>/}. Normal sstables are
+ * are directly within that subdirectory structure while 2ndary index, backups and snapshot are each inside an
+ * additional subdirectory. The file themselves have the form:
+ * {@code <version>-<gen>-<format>-<component>}.
+ * <p>
+ * Note that this method will only sucessfully parse sstable files of supported versions.
+ *
+ * @param file the {@code File} object for the filename to parse.
+ * @return the descriptor for the parsed file.
+ *
+ * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+ * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+ * versions.
+ */
+ public static Descriptor fromFilename(File file)
{
- return fromFilename(directory, name, false);
+ return fromFilenameWithComponent(file).left;
}
/**
- * Filename of the form is vary by version:
+ * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
*
- * <ul>
- * <li><ksname>-<cfname>-(tmp-)?<version>-<gen>-<component> for cassandra 2.0 and before</li>
- * <li>(<tmp marker>-)?<version>-<gen>-<component> for cassandra 3.0 and later</li>
- * </ul>
+ * @param file the {@code File} object for the filename to parse.
+ * @return a pair of the descriptor and component corresponding to the provided {@code file}.
*
- * If this is for SSTable of secondary index, directory should ends with index name for 2.1+.
- *
- * @param directory The directory of the SSTable files
- * @param name The name of the SSTable file
- * @param skipComponent true if the name param should not be parsed for a component tag
- *
- * @return A Descriptor for the SSTable, and the Component remainder.
+ * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+ * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+ * versions.
*/
- public static Pair<Descriptor, String> fromFilename(File directory, String name, boolean skipComponent)
+ public static Pair<Descriptor, Component> fromFilenameWithComponent(File file)
{
- File parentDirectory = directory != null ? directory : new File(".");
+ // We need to extract the keyspace and table names from the parent directories, so make sure we deal with the
+ // absolute path.
+ if (!file.isAbsolute())
+ file = file.getAbsoluteFile();
- // tokenize the filename
- StringTokenizer st = new StringTokenizer(name, String.valueOf(separator));
- String nexttok;
+ String name = file.getName();
+ List<String> tokens = filenameSplitter.splitToList(name);
+ int size = tokens.size();
- // read tokens backwards to determine version
- Deque<String> tokenStack = new ArrayDeque<>();
- while (st.hasMoreTokens())
+ if (size != 4)
{
- tokenStack.push(st.nextToken());
+ // This is an invalid sstable file for this version. But to provide a more helpful error message, we detect
+ // old format sstable, which had the format:
+ // <keyspace>-<table>-(tmp-)?<version>-<gen>-<component>
+ // Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect
+ // but we're just trying to be helpful, not perfect.
+ if (size == 5 || size == 6)
+ throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.",
+ name,
+ tokens.get(size - 3)));
+ throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name));
}
- // component suffix
- String component = skipComponent ? null : tokenStack.pop();
+ String versionString = tokens.get(0);
+ if (!Version.validate(versionString))
+ throw invalidSSTable(name, "invalid version %s", versionString);
- nexttok = tokenStack.pop();
- // generation OR format type
- SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
- if (!CharMatcher.DIGIT.matchesAllOf(nexttok))
+ int generation;
+ try
{
- fmt = SSTableFormat.Type.validate(nexttok);
- nexttok = tokenStack.pop();
+ generation = Integer.parseInt(tokens.get(1));
+ }
+ catch (NumberFormatException e)
+ {
+ throw invalidSSTable(name, "the 'generation' part of the name doesn't parse as a number");
}
- // generation
- int generation = Integer.parseInt(nexttok);
+ String formatString = tokens.get(2);
+ SSTableFormat.Type format;
+ try
+ {
+ format = SSTableFormat.Type.validate(formatString);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw invalidSSTable(name, "unknown 'format' part (%s)", formatString);
+ }
- // version
- nexttok = tokenStack.pop();
+ Component component = Component.parse(tokens.get(3));
- if (!Version.validate(nexttok))
- throw new UnsupportedOperationException("SSTable " + name + " is too old to open. Upgrade to 2.0 first, and run upgradesstables");
+ Version version = format.info.getVersion(versionString);
+ if (!version.isCompatible())
+ throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString);
- Version version = fmt.info.getVersion(nexttok);
+ File directory = parentOf(name, file);
+ File tableDir = directory;
- // ks/cf names
- String ksname, cfname;
- if (version.hasNewFileName())
+ // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
+ String indexName = "";
+ if (tableDir.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
{
- // for 2.1+ read ks and cf names from directory
- File cfDirectory = parentDirectory;
- // check if this is secondary index
- String indexName = "";
- if (cfDirectory.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
- {
- indexName = cfDirectory.getName();
- cfDirectory = cfDirectory.getParentFile();
- }
- if (cfDirectory.getName().equals(Directories.BACKUPS_SUBDIR))
- {
- cfDirectory = cfDirectory.getParentFile();
- }
- else if (cfDirectory.getParentFile().getName().equals(Directories.SNAPSHOT_SUBDIR))
- {
- cfDirectory = cfDirectory.getParentFile().getParentFile();
- }
- cfname = cfDirectory.getName().split("-")[0] + indexName;
- ksname = cfDirectory.getParentFile().getName();
+ indexName = tableDir.getName();
+ tableDir = parentOf(name, tableDir);
}
- else
- {
- cfname = tokenStack.pop();
- ksname = tokenStack.pop();
- }
- assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory;
- return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt,
- // _assume_ version from version
- Component.digestFor(version.uncompressedChecksumType())),
- component);
+ // Then it can be a backup or a snapshot
+ if (tableDir.getName().equals(Directories.BACKUPS_SUBDIR))
+ tableDir = tableDir.getParentFile();
+ else if (parentOf(name, tableDir).getName().equals(Directories.SNAPSHOT_SUBDIR))
+ tableDir = parentOf(name, parentOf(name, tableDir));
+
+ String table = tableDir.getName().split("-")[0] + indexName;
+ String keyspace = parentOf(name, tableDir).getName();
+
+ return Pair.create(new Descriptor(version, directory, keyspace, table, generation, format), component);
+ }
+
+ private static File parentOf(String name, File file)
+ {
+ File parent = file.getParentFile();
+ if (parent == null)
+ throw invalidSSTable(name, "cannot extract keyspace and table name; make sure the sstable is in the proper sub-directories");
+ return parent;
+ }
+
+ private static IllegalArgumentException invalidSSTable(String name, String msgFormat, Object... parameters)
+ {
+ throw new IllegalArgumentException(String.format("Invalid sstable file " + name + ": " + msgFormat, parameters));
}
public IMetadataSerializer getMetadataSerializer()
{
- if (version.hasNewStatsFile())
- return new MetadataSerializer();
- else
- return new LegacyMetadataSerializer();
+ return new MetadataSerializer();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
index 9ee1996..03246c5 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.io.sstable;
import java.io.IOException;
+import java.util.List;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -79,6 +82,11 @@ public class IndexInfo
this.endOpenMarker = endOpenMarker;
}
+ public static IndexInfo.Serializer serializer(Version version, SerializationHeader header)
+ {
+ return new IndexInfo.Serializer(version, header.clusteringTypes());
+ }
+
public static class Serializer implements ISerializer<IndexInfo>
{
// This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding.
@@ -87,21 +95,19 @@ public class IndexInfo
// size so using the default is almost surely better than using no base at all.
public static final long WIDTH_BASE = 64 * 1024;
- private final ISerializer<ClusteringPrefix> clusteringSerializer;
- private final Version version;
+ private final int version;
+ private final List<AbstractType<?>> clusteringTypes;
- public Serializer(Version version, ISerializer<ClusteringPrefix> clusteringSerializer)
+ public Serializer(Version version, List<AbstractType<?>> clusteringTypes)
{
- this.clusteringSerializer = clusteringSerializer;
- this.version = version;
+ this.version = version.correspondingMessagingVersion();
+ this.clusteringTypes = clusteringTypes;
}
public void serialize(IndexInfo info, DataOutputPlus out) throws IOException
{
- assert version.storeRows() : "We read old index files but we should never write them";
-
- clusteringSerializer.serialize(info.firstName, out);
- clusteringSerializer.serialize(info.lastName, out);
+ ClusteringPrefix.serializer.serialize(info.firstName, out, version, clusteringTypes);
+ ClusteringPrefix.serializer.serialize(info.lastName, out, version, clusteringTypes);
out.writeUnsignedVInt(info.offset);
out.writeVInt(info.width - WIDTH_BASE);
@@ -112,53 +118,33 @@ public class IndexInfo
public void skip(DataInputPlus in) throws IOException
{
- clusteringSerializer.skip(in);
- clusteringSerializer.skip(in);
- if (version.storeRows())
- {
- in.readUnsignedVInt();
- in.readVInt();
- if (in.readBoolean())
- DeletionTime.serializer.skip(in);
- }
- else
- {
- in.skipBytes(TypeSizes.sizeof(0L));
- in.skipBytes(TypeSizes.sizeof(0L));
- }
+ ClusteringPrefix.serializer.skip(in, version, clusteringTypes);
+ ClusteringPrefix.serializer.skip(in, version, clusteringTypes);
+ in.readUnsignedVInt();
+ in.readVInt();
+ if (in.readBoolean())
+ DeletionTime.serializer.skip(in);
}
public IndexInfo deserialize(DataInputPlus in) throws IOException
{
- ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
- ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
- long offset;
- long width;
+ ClusteringPrefix firstName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes);
+ ClusteringPrefix lastName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes);
+ long offset = in.readUnsignedVInt();
+ long width = in.readVInt() + WIDTH_BASE;
DeletionTime endOpenMarker = null;
- if (version.storeRows())
- {
- offset = in.readUnsignedVInt();
- width = in.readVInt() + WIDTH_BASE;
- if (in.readBoolean())
- endOpenMarker = DeletionTime.serializer.deserialize(in);
- }
- else
- {
- offset = in.readLong();
- width = in.readLong();
- }
+ if (in.readBoolean())
+ endOpenMarker = DeletionTime.serializer.deserialize(in);
return new IndexInfo(firstName, lastName, offset, width, endOpenMarker);
}
public long serializedSize(IndexInfo info)
{
- assert version.storeRows() : "We read old index files but we should never write them";
-
- long size = clusteringSerializer.serializedSize(info.firstName)
- + clusteringSerializer.serializedSize(info.lastName)
- + TypeSizes.sizeofUnsignedVInt(info.offset)
- + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
- + TypeSizes.sizeof(info.endOpenMarker != null);
+ long size = ClusteringPrefix.serializer.serializedSize(info.firstName, version, clusteringTypes)
+ + ClusteringPrefix.serializer.serializedSize(info.lastName, version, clusteringTypes)
+ + TypeSizes.sizeofUnsignedVInt(info.offset)
+ + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
+ + TypeSizes.sizeof(info.endOpenMarker != null);
if (info.endOpenMarker != null)
size += DeletionTime.serializer.serializedSize(info.endOpenMarker);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 6de3478..303adfd 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -268,16 +268,13 @@ public class IndexSummary extends WrappedSharedCloseable
public static class IndexSummarySerializer
{
- public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
+ public void serialize(IndexSummary t, DataOutputPlus out) throws IOException
{
out.writeInt(t.minIndexInterval);
out.writeInt(t.offsetCount);
out.writeLong(t.getOffHeapSize());
- if (withSamplingLevel)
- {
- out.writeInt(t.samplingLevel);
- out.writeInt(t.sizeAtFullSampling);
- }
+ out.writeInt(t.samplingLevel);
+ out.writeInt(t.sizeAtFullSampling);
// our on-disk representation treats the offsets and the summary data as one contiguous structure,
// in which the offsets are based from the start of the structure. i.e., if the offsets occupy
// X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
@@ -297,7 +294,7 @@ public class IndexSummary extends WrappedSharedCloseable
}
@SuppressWarnings("resource")
- public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
+ public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
{
int minIndexInterval = in.readInt();
if (minIndexInterval != expectedMinIndexInterval)
@@ -308,17 +305,8 @@ public class IndexSummary extends WrappedSharedCloseable
int offsetCount = in.readInt();
long offheapSize = in.readLong();
- int samplingLevel, fullSamplingSummarySize;
- if (haveSamplingLevel)
- {
- samplingLevel = in.readInt();
- fullSamplingSummarySize = in.readInt();
- }
- else
- {
- samplingLevel = BASE_SAMPLING_LEVEL;
- fullSamplingSummarySize = offsetCount;
- }
+ int samplingLevel = in.readInt();
+ int fullSamplingSummarySize = in.readInt();
int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval);
if (effectiveIndexInterval > maxIndexInterval)
@@ -355,13 +343,12 @@ public class IndexSummary extends WrappedSharedCloseable
*
* Only for use by offline tools like SSTableMetadataViewer, otherwise SSTable.first/last should be used.
*/
- public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
+ public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner) throws IOException
{
in.skipBytes(4); // minIndexInterval
int offsetCount = in.readInt();
long offheapSize = in.readLong();
- if (haveSamplingLevel)
- in.skipBytes(8); // samplingLevel, fullSamplingSummarySize
+ in.skipBytes(8); // samplingLevel, fullSamplingSummarySize
in.skip(offsetCount * 4);
in.skip(offheapSize - offsetCount * 4);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 8fb4835..fc326dc 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -73,21 +73,9 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
public List<SSTableReader> redistributeSummaries() throws IOException
{
logger.info("Redistributing index summaries");
- List<SSTableReader> oldFormatSSTables = new ArrayList<>();
List<SSTableReader> redistribute = new ArrayList<>();
for (LifecycleTransaction txn : transactions.values())
{
- for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
- {
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel())
- {
- oldFormatSSTables.add(sstable);
- txn.cancel(sstable);
- }
- }
redistribute.addAll(txn.originals());
}
@@ -119,7 +107,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
long remainingBytes = memoryPoolBytes;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+ for (SSTableReader sstable : compacting)
remainingBytes -= sstable.getIndexSummaryOffHeapSize();
logger.trace("Index summaries for compacting SSTables are using {} MB of space",
@@ -130,7 +118,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
txn.finish();
total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+ for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
total += sstable.getIndexSummaryOffHeapSize();
logger.trace("Completed resizing of index summaries; current approximate memory used: {}",
FBUtilities.prettyPrintMemory(total));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 601f5a0..8556cfa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -168,14 +168,40 @@ public abstract class SSTable
}
/**
- * @return Descriptor and Component pair. null if given file is not acceptable as SSTable component.
- * If component is of unknown type, returns CUSTOM component.
+ * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object.
+ *
+ * @param file the filename to parse.
+ * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to
+ * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be
+ * returned as CUSTOM ones.
+ */
+ public static Pair<Descriptor, Component> tryComponentFromFilename(File file)
+ {
+ try
+ {
+ return Descriptor.fromFilenameWithComponent(file);
+ }
+ catch (Throwable e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Parse a sstable filename into a {@link Descriptor} object.
+ * <p>
+ * Note that this method ignores the component part of the filename; if this is not what you want, use
+ * {@link #tryComponentFromFilename} instead.
+ *
+ * @param file the filename to parse.
+ * @return the {@code Descriptor} corresponding to {@code file} if it corresponds to a valid and supported sstable
+ * filename, {@code null} otherwise.
*/
- public static Pair<Descriptor, Component> tryComponentFromFilename(File dir, String name)
+ public static Descriptor tryDescriptorFromFilename(File file)
{
try
{
- return Component.fromFilename(dir, name);
+ return Descriptor.fromFilename(file);
}
catch (Throwable e)
{
@@ -218,17 +244,9 @@ public abstract class SSTable
Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size());
for (Component.Type componentType : knownTypes)
{
- if (componentType == Component.Type.DIGEST)
- {
- if (desc.digestComponent != null && new File(desc.filenameFor(desc.digestComponent)).exists())
- components.add(desc.digestComponent);
- }
- else
- {
- Component component = new Component(componentType);
- if (new File(desc.filenameFor(component)).exists())
- components.add(component);
- }
+ Component component = new Component(componentType);
+ if (new File(desc.filenameFor(component)).exists())
+ components.add(component);
}
return components;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 043f6fa..e00de4a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -85,7 +85,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
- Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
+ Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file);
Descriptor desc = p == null ? null : p.left;
if (p == null || !p.right.equals(Component.DATA))
return false;