You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:50:00 UTC
[07/11] cassandra git commit: Remove pre-3.0 compatibility code for
4.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index ce42126..ad0f3c9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -54,18 +54,12 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
public static SSTableSimpleIterator create(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion)
{
- if (helper.version < MessagingService.VERSION_30)
- return new OldFormatIterator(metadata, in, helper, partitionDeletion);
- else
- return new CurrentFormatIterator(metadata, in, header, helper);
+ return new CurrentFormatIterator(metadata, in, header, helper);
}
public static SSTableSimpleIterator createTombstoneOnly(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion)
{
- if (helper.version < MessagingService.VERSION_30)
- return new OldFormatTombstoneIterator(metadata, in, helper, partitionDeletion);
- else
- return new CurrentFormatTombstoneIterator(metadata, in, header, helper);
+ return new CurrentFormatTombstoneIterator(metadata, in, header, helper);
}
public abstract Row readStaticRow() throws IOException;
@@ -136,106 +130,4 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
}
}
}
-
- private static class OldFormatIterator extends SSTableSimpleIterator
- {
- private final UnfilteredDeserializer deserializer;
-
- private OldFormatIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion)
- {
- super(metadata, in, helper);
- // We use an UnfilteredDeserializer because even though we don't need all it's fanciness, it happens to handle all
- // the details we need for reading the old format.
- this.deserializer = UnfilteredDeserializer.create(metadata, in, null, helper, partitionDeletion, false);
- }
-
- public Row readStaticRow() throws IOException
- {
- if (metadata.isCompactTable())
- {
- // For static compact tables, in the old format, static columns are intermingled with the other columns, so we
- // need to extract them. Which imply 2 passes (one to extract the static, then one for other value).
- if (metadata.isStaticCompactTable())
- {
- assert in instanceof RewindableDataInput;
- RewindableDataInput file = (RewindableDataInput)in;
- DataPosition mark = file.mark();
- Row staticRow = LegacyLayout.extractStaticColumns(metadata, file, metadata.partitionColumns().statics);
- file.reset(mark);
-
- // We've extracted the static columns, so we must ignore them on the 2nd pass
- ((UnfilteredDeserializer.OldFormatDeserializer)deserializer).setSkipStatic();
- return staticRow;
- }
- else
- {
- return Rows.EMPTY_STATIC_ROW;
- }
- }
-
- return deserializer.hasNext() && deserializer.nextIsStatic()
- ? (Row)deserializer.readNext()
- : Rows.EMPTY_STATIC_ROW;
-
- }
-
- protected Unfiltered computeNext()
- {
- while (true)
- {
- try
- {
- if (!deserializer.hasNext())
- return endOfData();
-
- Unfiltered unfiltered = deserializer.readNext();
- if (metadata.isStaticCompactTable() && unfiltered.kind() == Unfiltered.Kind.ROW)
- {
- Row row = (Row) unfiltered;
- ColumnDefinition def = metadata.getColumnDefinition(LegacyLayout.encodeClustering(metadata, row.clustering()));
- if (def != null && def.isStatic())
- continue;
- }
- return unfiltered;
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
- }
-
- }
-
- private static class OldFormatTombstoneIterator extends OldFormatIterator
- {
- private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion)
- {
- super(metadata, in, helper, partitionDeletion);
- }
-
- public Row readStaticRow() throws IOException
- {
- Row row = super.readStaticRow();
- if (!row.deletion().isLive())
- return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
- return Rows.EMPTY_STATIC_ROW;
- }
-
- protected Unfiltered computeNext()
- {
- while (true)
- {
- Unfiltered unfiltered = super.computeNext();
- if (unfiltered == null || unfiltered.isRangeTombstoneMarker())
- return unfiltered;
-
- Row row = (Row) unfiltered;
- if (!row.deletion().isLive())
- return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
- // Otherwise read next.
- }
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 015c5bb..323b1bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -148,14 +148,8 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
return new SSTableTxnWriter(txn, writer);
}
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, SerializationHeader header)
{
- Descriptor desc = Descriptor.fromFilename(filename);
- return create(cfs, desc, keyCount, repairedAt, sstableLevel, header);
- }
-
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header)
- {
- return create(cfs, filename, keyCount, repairedAt, 0, header);
+ return create(cfs, desc, keyCount, repairedAt, 0, header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 3665da7..89c064b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -68,7 +68,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to store %s",
FBUtilities.prettyPrintMemory(totalSize)));
- Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+ Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format);
currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
}
}
@@ -90,7 +90,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
if (currentWriter != null)
finishedWriters.add(currentWriter);
- Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+ Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format);
currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 4391946..29e29ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -41,10 +41,6 @@ public interface SSTableFormat
public static enum Type
{
- //Used internally to refer to files with no
- //format flag in the filename
- LEGACY("big", BigFormat.instance),
-
//The original sstable format
BIG("big", BigFormat.instance);
@@ -70,10 +66,6 @@ public interface SSTableFormat
{
for (Type valid : Type.values())
{
- //This is used internally for old sstables
- if (valid == LEGACY)
- continue;
-
if (valid.name.equalsIgnoreCase(name))
return valid;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 1a2e1b0..add8ddc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -253,58 +253,48 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
long count = -1;
- // check if cardinality estimator is available for all SSTables
- boolean cardinalityAvailable = !Iterables.isEmpty(sstables) && Iterables.all(sstables, new Predicate<SSTableReader>()
- {
- public boolean apply(SSTableReader sstable)
- {
- return sstable.descriptor.version.hasNewStatsFile();
- }
- });
+ if (Iterables.isEmpty(sstables))
+ return count;
- // if it is, load them to estimate key count
- if (cardinalityAvailable)
+ boolean failed = false;
+ ICardinality cardinality = null;
+ for (SSTableReader sstable : sstables)
{
- boolean failed = false;
- ICardinality cardinality = null;
- for (SSTableReader sstable : sstables)
- {
- if (sstable.openReason == OpenReason.EARLY)
- continue;
-
- try
- {
- CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
- // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index
- // summary. (CASSANDRA-10676)
- if (metadata == null)
- {
- logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());
- failed = true;
- break;
- }
+ if (sstable.openReason == OpenReason.EARLY)
+ continue;
- if (cardinality == null)
- cardinality = metadata.cardinalityEstimator;
- else
- cardinality = cardinality.merge(metadata.cardinalityEstimator);
- }
- catch (IOException e)
- {
- logger.warn("Reading cardinality from Statistics.db failed.", e);
- failed = true;
- break;
- }
- catch (CardinalityMergeException e)
+ try
+ {
+ CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
+ // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index
+ // summary. (CASSANDRA-10676)
+ if (metadata == null)
{
- logger.warn("Cardinality merge failed.", e);
+ logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());
failed = true;
break;
}
+
+ if (cardinality == null)
+ cardinality = metadata.cardinalityEstimator;
+ else
+ cardinality = cardinality.merge(metadata.cardinalityEstimator);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Reading cardinality from Statistics.db failed.", e);
+ failed = true;
+ break;
+ }
+ catch (CardinalityMergeException e)
+ {
+ logger.warn("Cardinality merge failed.", e);
+ failed = true;
+ break;
}
- if (cardinality != null && !failed)
- count = cardinality.cardinality();
}
+ if (cardinality != null && !failed)
+ count = cardinality.cardinality();
// if something went wrong above or cardinality is not available, calculate using index summary
if (count < 0)
@@ -481,14 +471,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
// For the 3.0+ sstable format, the (misnomed) stats component hold the serialization header which we need to deserialize the sstable content
- assert !descriptor.version.storeRows() || components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor;
+ assert components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor;
EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types);
ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
- assert !descriptor.version.storeRows() || header != null;
+ assert header != null;
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
@@ -730,7 +720,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
// bf is enabled and fp chance matches the currently configured value.
load(false, true);
- loadBloomFilter(descriptor.version.hasOldBfHashOrder());
+ loadBloomFilter();
}
}
@@ -739,11 +729,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*
* @throws IOException
*/
- private void loadBloomFilter(boolean oldBfHashOrder) throws IOException
+ private void loadBloomFilter() throws IOException
{
try (DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))))
{
- bf = FilterFactory.deserialize(stream, true, oldBfHashOrder);
+ bf = FilterFactory.deserialize(stream, true);
}
}
@@ -829,7 +819,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
: estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
if (recreateBloomFilter)
- bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder());
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance, true);
try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, samplingLevel))
{
@@ -883,7 +873,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
indexSummary = IndexSummary.serializer.deserialize(
- iStream, getPartitioner(), descriptor.version.hasSamplingLevel(),
+ iStream, getPartitioner(),
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
first = decorateKey(ByteBufferUtil.readWithLength(iStream));
last = decorateKey(ByteBufferUtil.readWithLength(iStream));
@@ -932,7 +922,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
{
- IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
+ IndexSummary.serializer.serialize(summary, oStream);
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
}
@@ -1106,8 +1096,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
@SuppressWarnings("resource")
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
- assert descriptor.version.hasSamplingLevel();
-
synchronized (tidy.global)
{
assert openReason != OpenReason.EARLY;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 9fb5f7c..874c679 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -127,26 +127,14 @@ public abstract class SSTableWriter extends SSTable implements Transactional
return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
}
- public static SSTableWriter create(String filename,
- long keyCount,
- long repairedAt,
- int sstableLevel,
- SerializationHeader header,
- Collection<Index> indexes,
- LifecycleTransaction txn)
- {
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn);
- }
-
@VisibleForTesting
- public static SSTableWriter create(String filename,
+ public static SSTableWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- Descriptor descriptor = Descriptor.fromFilename(filename);
return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
}
@@ -157,7 +145,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
Component.STATS,
Component.SUMMARY,
Component.TOC,
- Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType())));
+ Component.DIGEST));
if (metadata.params.bloomFilterFpChance < 1.0)
components.add(Component.FILTER);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 96c5a6e..b78e434 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -46,30 +46,8 @@ public abstract class Version
public abstract boolean isLatestVersion();
- public abstract boolean hasSamplingLevel();
-
- public abstract boolean hasNewStatsFile();
-
- public abstract ChecksumType compressedChecksumType();
-
- public abstract ChecksumType uncompressedChecksumType();
-
- public abstract boolean hasRepairedAt();
-
- public abstract boolean tracksLegacyCounterShards();
-
- public abstract boolean hasNewFileName();
-
- public abstract boolean storeRows();
-
public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far
- public abstract boolean hasOldBfHashOrder();
-
- public abstract boolean hasCompactionAncestors();
-
- public abstract boolean hasBoundaries();
-
public abstract boolean hasCommitLogLowerBound();
public abstract boolean hasCommitLogIntervals();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 3846194..980eed0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -111,16 +111,8 @@ public class BigFormat implements SSTableFormat
static class BigVersion extends Version
{
public static final String current_version = "mc";
- public static final String earliest_supported_version = "jb";
+ public static final String earliest_supported_version = "ma";
- // jb (2.0.1): switch from crc32 to adler32 for compression checksums
- // checksum the compressed data
- // ka (2.1.0): new Statistics.db file format
- // index summaries can be downsampled and the sampling level is persisted
- // switch uncompressed checksums to adler32
- // tracks presense of legacy (local and remote) counter shards
- // la (2.2.0): new file name format
- // lb (2.2.7): commit log lower bound included
// ma (3.0.0): swap bf hash order
// store rows natively
// mb (3.0.7, 3.7): commit log lower bound included
@@ -129,62 +121,17 @@ public class BigFormat implements SSTableFormat
// NOTE: when adding a new version, please add that to LegacySSTableTest, too.
private final boolean isLatestVersion;
- private final boolean hasSamplingLevel;
- private final boolean newStatsFile;
- private final ChecksumType compressedChecksumType;
- private final ChecksumType uncompressedChecksumType;
- private final boolean hasRepairedAt;
- private final boolean tracksLegacyCounterShards;
- private final boolean newFileName;
- public final boolean storeRows;
- public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
- public final boolean hasBoundaries;
- /**
- * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped)
- * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
- */
- private final boolean hasOldBfHashOrder;
+ public final int correspondingMessagingVersion;
private final boolean hasCommitLogLowerBound;
private final boolean hasCommitLogIntervals;
- /**
- * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
- */
- private final boolean hasCompactionAncestors;
-
BigVersion(String version)
{
super(instance, version);
isLatestVersion = version.compareTo(current_version) == 0;
- hasSamplingLevel = version.compareTo("ka") >= 0;
- newStatsFile = version.compareTo("ka") >= 0;
-
- //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially after Haswell
- //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684
- ChecksumType checksumType = ChecksumType.CRC32;
- if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0)
- checksumType = ChecksumType.Adler32;
- this.uncompressedChecksumType = checksumType;
-
- checksumType = ChecksumType.CRC32;
- if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0)
- checksumType = ChecksumType.Adler32;
- this.compressedChecksumType = checksumType;
-
- hasRepairedAt = version.compareTo("ka") >= 0;
- tracksLegacyCounterShards = version.compareTo("ka") >= 0;
+ correspondingMessagingVersion = MessagingService.VERSION_30;
- newFileName = version.compareTo("la") >= 0;
-
- hasOldBfHashOrder = version.compareTo("ma") < 0;
- hasCompactionAncestors = version.compareTo("ma") < 0;
- storeRows = version.compareTo("ma") >= 0;
- correspondingMessagingVersion = storeRows
- ? MessagingService.VERSION_30
- : MessagingService.VERSION_21;
-
- hasBoundaries = version.compareTo("ma") < 0;
hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0)
|| version.compareTo("mb") >= 0;
hasCommitLogIntervals = version.compareTo("mc") >= 0;
@@ -197,60 +144,6 @@ public class BigFormat implements SSTableFormat
}
@Override
- public boolean hasSamplingLevel()
- {
- return hasSamplingLevel;
- }
-
- @Override
- public boolean hasNewStatsFile()
- {
- return newStatsFile;
- }
-
- @Override
- public ChecksumType compressedChecksumType()
- {
- return compressedChecksumType;
- }
-
- @Override
- public ChecksumType uncompressedChecksumType()
- {
- return uncompressedChecksumType;
- }
-
- @Override
- public boolean hasRepairedAt()
- {
- return hasRepairedAt;
- }
-
- @Override
- public boolean tracksLegacyCounterShards()
- {
- return tracksLegacyCounterShards;
- }
-
- @Override
- public boolean hasOldBfHashOrder()
- {
- return hasOldBfHashOrder;
- }
-
- @Override
- public boolean hasCompactionAncestors()
- {
- return hasCompactionAncestors;
- }
-
- @Override
- public boolean hasNewFileName()
- {
- return newFileName;
- }
-
- @Override
public boolean hasCommitLogLowerBound()
{
return hasCommitLogLowerBound;
@@ -263,24 +156,12 @@ public class BigFormat implements SSTableFormat
}
@Override
- public boolean storeRows()
- {
- return storeRows;
- }
-
- @Override
public int correspondingMessagingVersion()
{
return correspondingMessagingVersion;
}
@Override
- public boolean hasBoundaries()
- {
- return hasBoundaries;
- }
-
- @Override
public boolean isCompatible()
{
return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c3139a3..018edac 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -84,7 +84,7 @@ public class BigTableWriter extends SSTableWriter
{
dataFile = new CompressedSequentialWriter(new File(getFilename()),
descriptor.filenameFor(Component.COMPRESSION_INFO),
- new File(descriptor.filenameFor(descriptor.digestComponent)),
+ new File(descriptor.filenameFor(Component.DIGEST)),
writerOption,
metadata.params.compression,
metadataCollector);
@@ -93,7 +93,7 @@ public class BigTableWriter extends SSTableWriter
{
dataFile = new ChecksummedSequentialWriter(new File(getFilename()),
new File(descriptor.filenameFor(Component.CRC)),
- new File(descriptor.filenameFor(descriptor.digestComponent)),
+ new File(descriptor.filenameFor(Component.DIGEST)),
writerOption);
}
dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
@@ -442,7 +442,7 @@ public class BigTableWriter extends SSTableWriter
builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
chunkCache.ifPresent(builder::withChunkCache);
summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
- bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder());
+ bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true);
// register listeners to be alerted when the data files are flushed
indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset()));
dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index ef3453a..c9dfe39 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -75,30 +75,17 @@ public class CompactionMetadata extends MetadataComponent
public int serializedSize(Version version, CompactionMetadata component) throws IOException
{
int sz = 0;
- if (version.hasCompactionAncestors())
- { // write empty ancestor marker
- sz = 4;
- }
byte[] serializedCardinality = component.cardinalityEstimator.getBytes();
return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz;
}
public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException
{
- if (version.hasCompactionAncestors())
- { // write empty ancestor marker
- out.writeInt(0);
- }
ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
}
public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException
{
- if (version.hasCompactionAncestors())
- { // skip ancestors
- int nbAncestors = in.readInt();
- in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors));
- }
ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt()));
return new CompactionMetadata(cardinality);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
deleted file mode 100644
index 6cc33f5..0000000
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.metadata;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.commitlog.CommitLogPosition;
-import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.StreamingHistogram;
-
-import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.commitLogPositionSetSerializer;
-
-/**
- * Serializer for SSTable from legacy versions
- */
-@Deprecated
-public class LegacyMetadataSerializer extends MetadataSerializer
-{
- /**
- * Legacy serialization is only used for SSTable level reset.
- */
- @Override
- public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out, Version version) throws IOException
- {
- ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION);
- StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS);
- CompactionMetadata compaction = (CompactionMetadata) components.get(MetadataType.COMPACTION);
-
- assert validation != null && stats != null && compaction != null && validation.partitioner != null;
-
- EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
- EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- CommitLogPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
- out.writeLong(stats.minTimestamp);
- out.writeLong(stats.maxTimestamp);
- out.writeInt(stats.maxLocalDeletionTime);
- out.writeDouble(validation.bloomFilterFPChance);
- out.writeDouble(stats.compressionRatio);
- out.writeUTF(validation.partitioner);
- out.writeInt(0); // compaction ancestors
- StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
- out.writeInt(stats.sstableLevel);
- out.writeInt(stats.minClusteringValues.size());
- for (ByteBuffer value : stats.minClusteringValues)
- ByteBufferUtil.writeWithShortLength(value, out);
- out.writeInt(stats.maxClusteringValues.size());
- for (ByteBuffer value : stats.maxClusteringValues)
- ByteBufferUtil.writeWithShortLength(value, out);
- if (version.hasCommitLogLowerBound())
- CommitLogPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
- if (version.hasCommitLogIntervals())
- commitLogPositionSetSerializer.serialize(stats.commitLogIntervals, out);
- }
-
- /**
- * Legacy serializer deserialize all components no matter what types are specified.
- */
- @Override
- public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
- {
- Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
-
- File statsFile = new File(descriptor.filenameFor(Component.STATS));
- if (!statsFile.exists() && types.contains(MetadataType.STATS))
- {
- components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata());
- }
- else
- {
- try (DataInputStreamPlus in = new DataInputStreamPlus(new BufferedInputStream(new FileInputStream(statsFile))))
- {
- EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
- EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
- CommitLogPosition commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
- long minTimestamp = in.readLong();
- long maxTimestamp = in.readLong();
- int maxLocalDeletionTime = in.readInt();
- double bloomFilterFPChance = in.readDouble();
- double compressionRatio = in.readDouble();
- String partitioner = in.readUTF();
- int nbAncestors = in.readInt(); //skip compaction ancestors
- in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors));
- StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
- int sstableLevel = 0;
- if (in.available() > 0)
- sstableLevel = in.readInt();
-
- int colCount = in.readInt();
- List<ByteBuffer> minColumnNames = new ArrayList<>(colCount);
- for (int i = 0; i < colCount; i++)
- minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-
- colCount = in.readInt();
- List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
- for (int i = 0; i < colCount; i++)
- maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-
- if (descriptor.version.hasCommitLogLowerBound())
- commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
- IntervalSet<CommitLogPosition> commitLogIntervals;
- if (descriptor.version.hasCommitLogIntervals())
- commitLogIntervals = commitLogPositionSetSerializer.deserialize(in);
- else
- commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound);
-
- if (types.contains(MetadataType.VALIDATION))
- components.put(MetadataType.VALIDATION,
- new ValidationMetadata(partitioner, bloomFilterFPChance));
- if (types.contains(MetadataType.STATS))
- components.put(MetadataType.STATS,
- new StatsMetadata(partitionSizes,
- columnCounts,
- commitLogIntervals,
- minTimestamp,
- maxTimestamp,
- Integer.MAX_VALUE,
- maxLocalDeletionTime,
- 0,
- Integer.MAX_VALUE,
- compressionRatio,
- tombstoneHistogram,
- sstableLevel,
- minColumnNames,
- maxColumnNames,
- true,
- ActiveRepairService.UNREPAIRED_SSTABLE,
- -1,
- -1));
- if (types.contains(MetadataType.COMPACTION))
- components.put(MetadataType.COMPACTION,
- new CompactionMetadata(null));
- }
- }
- return components;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index c83c2cf..0f6434b 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -236,10 +236,7 @@ public class StatsMetadata extends MetadataComponent
size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE));
- if (version.storeRows())
- size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
- else
- size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
+ size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime);
size += TypeSizes.sizeof(component.sstableLevel);
// min column names
@@ -251,8 +248,7 @@ public class StatsMetadata extends MetadataComponent
for (ByteBuffer value : component.maxClusteringValues)
size += 2 + value.remaining(); // with short length
size += TypeSizes.sizeof(component.hasLegacyCounterShards);
- if (version.storeRows())
- size += 8 + 8; // totalColumnsSet, totalRows
+ size += 8 + 8; // totalColumnsSet, totalRows
if (version.hasCommitLogLowerBound())
size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE));
if (version.hasCommitLogIntervals())
@@ -267,14 +263,10 @@ public class StatsMetadata extends MetadataComponent
CommitLogPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
- if (version.storeRows())
- out.writeInt(component.minLocalDeletionTime);
+ out.writeInt(component.minLocalDeletionTime);
out.writeInt(component.maxLocalDeletionTime);
- if (version.storeRows())
- {
- out.writeInt(component.minTTL);
- out.writeInt(component.maxTTL);
- }
+ out.writeInt(component.minTTL);
+ out.writeInt(component.maxTTL);
out.writeDouble(component.compressionRatio);
StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
out.writeInt(component.sstableLevel);
@@ -287,11 +279,8 @@ public class StatsMetadata extends MetadataComponent
ByteBufferUtil.writeWithShortLength(value, out);
out.writeBoolean(component.hasLegacyCounterShards);
- if (version.storeRows())
- {
- out.writeLong(component.totalColumnsSet);
- out.writeLong(component.totalRows);
- }
+ out.writeLong(component.totalColumnsSet);
+ out.writeLong(component.totalRows);
if (version.hasCommitLogLowerBound())
CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
@@ -307,17 +296,14 @@ public class StatsMetadata extends MetadataComponent
commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
- // We use MAX_VALUE as that's the default value for "no deletion time"
- int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
+ int minLocalDeletionTime = in.readInt();
int maxLocalDeletionTime = in.readInt();
- int minTTL = version.storeRows() ? in.readInt() : 0;
- int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
+ int minTTL = in.readInt();
+ int maxTTL = in.readInt();
double compressionRatio = in.readDouble();
StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
int sstableLevel = in.readInt();
- long repairedAt = 0;
- if (version.hasRepairedAt())
- repairedAt = in.readLong();
+ long repairedAt = in.readLong();
int colCount = in.readInt();
List<ByteBuffer> minClusteringValues = new ArrayList<>(colCount);
@@ -329,12 +315,10 @@ public class StatsMetadata extends MetadataComponent
for (int i = 0; i < colCount; i++)
maxClusteringValues.add(ByteBufferUtil.readWithShortLength(in));
- boolean hasLegacyCounterShards = true;
- if (version.tracksLegacyCounterShards())
- hasLegacyCounterShards = in.readBoolean();
+ boolean hasLegacyCounterShards = in.readBoolean();
- long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
- long totalRows = version.storeRows() ? in.readLong() : -1L;
+ long totalColumnsSet = in.readLong();
+ long totalRows = in.readLong();
if (version.hasCommitLogLowerBound())
commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
index 8f00ce7..219f0eb 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.compress.CorruptBlockException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.utils.ChecksumType;
public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
{
@@ -142,7 +143,7 @@ public abstract class CompressedChunkReader extends AbstractReaderFileProxy impl
if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
compressed.rewind();
- int checksum = (int) metadata.checksumType.of(compressed);
+ int checksum = (int) ChecksumType.CRC32.of(compressed);
compressed.clear().limit(Integer.BYTES);
if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
@@ -204,7 +205,7 @@ public abstract class CompressedChunkReader extends AbstractReaderFileProxy impl
{
compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
- int checksum = (int) metadata.checksumType.of(compressedChunk);
+ int checksum = (int) ChecksumType.CRC32.of(compressedChunk);
compressedChunk.limit(compressedChunk.capacity());
if (compressedChunk.getInt() != checksum)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index cee23c9..91b189d 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -44,7 +44,7 @@ public class DataIntegrityMetadata
public ChecksumValidator(Descriptor descriptor) throws IOException
{
- this(descriptor.version.uncompressedChecksumType(),
+ this(ChecksumType.CRC32,
RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
descriptor.filenameFor(Component.DATA));
}
@@ -99,8 +99,8 @@ public class DataIntegrityMetadata
public FileDigestValidator(Descriptor descriptor) throws IOException
{
this.descriptor = descriptor;
- checksum = descriptor.version.uncompressedChecksumType().newInstance();
- digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.digestFor(descriptor.version.uncompressedChecksumType()))));
+ checksum = ChecksumType.CRC32.newInstance();
+ digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 9878590..53e53a4 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -86,9 +86,9 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos
{
try
{
- if (version < MessagingService.VERSION_20)
+ if (version < MessagingService.VERSION_30)
throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; "
- + "The earliest version supported is 2.0.0",
+ + "The earliest version supported is 3.0.0",
version));
receiveMessages();
@@ -155,18 +155,11 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos
if (compressed)
{
logger.trace("Upgrading incoming connection to be compressed");
- if (version < MessagingService.VERSION_21)
- {
- in = new DataInputStreamPlus(new SnappyInputStream(socket.getInputStream()));
- }
- else
- {
- LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
- Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum();
- in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(),
- decompressor,
- checksum));
- }
+ LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+ Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum();
+ in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(),
+ decompressor,
+ checksum));
}
else
{
@@ -183,11 +176,8 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos
private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException
{
- int id;
- if (version < MessagingService.VERSION_20)
- id = Integer.parseInt(input.readUTF());
- else
- id = input.readInt();
+ int id = input.readInt();
+
long currentTime = ApproximateTime.currentTimeMillis();
MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime));
if (message == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 4f41ee5..94019f2 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -104,7 +104,7 @@ public class MessageOut<T>
{
CompactEndpointSerializationHelper.serialize(from, out);
- out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).ordinal());
+ out.writeInt(verb.ordinal());
out.writeInt(parameters.size());
for (Map.Entry<String, byte[]> entry : parameters.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f82e80b..38c1cd2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -88,10 +88,6 @@ public final class MessagingService implements MessagingServiceMBean
public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
// 8 bits version, so don't waste versions
- public static final int VERSION_12 = 6;
- public static final int VERSION_20 = 7;
- public static final int VERSION_21 = 8;
- public static final int VERSION_22 = 9;
public static final int VERSION_30 = 10;
public static final int current_version = VERSION_30;
@@ -105,9 +101,6 @@ public final class MessagingService implements MessagingServiceMBean
*/
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
- private boolean allNodesAtLeast22 = true;
- private boolean allNodesAtLeast30 = true;
-
public final MessagingMetrics metrics = new MessagingMetrics();
/* All verb handler identifiers */
@@ -236,16 +229,6 @@ public final class MessagingService implements MessagingServiceMBean
UNUSED_5,
;
- // This is to support a "late" choice of the verb based on the messaging service version.
- // See CASSANDRA-12249 for more details.
- public static Verb convertForMessagingServiceVersion(Verb verb, int version)
- {
- if (verb == PAGED_RANGE && version >= VERSION_30)
- return RANGE_SLICE;
-
- return verb;
- }
-
public long getTimeout()
{
return DatabaseDescriptor.getRpcTimeout();
@@ -319,9 +302,9 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MUTATION, Mutation.serializer);
put(Verb.READ_REPAIR, Mutation.serializer);
- put(Verb.READ, ReadCommand.readSerializer);
- put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
- put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer);
+ put(Verb.READ, ReadCommand.serializer);
+ put(Verb.RANGE_SLICE, ReadCommand.serializer);
+ put(Verb.PAGED_RANGE, ReadCommand.serializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
@@ -350,8 +333,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.HINT, HintResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
- put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
- put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer);
+ put(Verb.RANGE_SLICE, ReadResponse.serializer);
+ put(Verb.PAGED_RANGE, ReadResponse.serializer);
put(Verb.READ, ReadResponse.serializer);
put(Verb.TRUNCATE, TruncateResponse.serializer);
put(Verb.SNAPSHOT, null);
@@ -1041,16 +1024,6 @@ public final class MessagingService implements MessagingServiceMBean
return packed >>> (start + 1) - count & ~(-1 << count);
}
- public boolean areAllNodesAtLeast22()
- {
- return allNodesAtLeast22;
- }
-
- public boolean areAllNodesAtLeast30()
- {
- return allNodesAtLeast30;
- }
-
/**
* @return the last version associated with address, or @param version if this is the first such version
*/
@@ -1058,50 +1031,16 @@ public final class MessagingService implements MessagingServiceMBean
{
// We can't talk to someone from the future
version = Math.min(version, current_version);
-
logger.trace("Setting version {} for {}", version, endpoint);
- if (version < VERSION_22)
- allNodesAtLeast22 = false;
- if (version < VERSION_30)
- allNodesAtLeast30 = false;
-
Integer v = versions.put(endpoint, version);
-
- // if the version was increased to 2.2 or later see if the min version across the cluster has changed
- if (v != null && (v < VERSION_30 && version >= VERSION_22))
- refreshAllNodeMinVersions();
-
return v == null ? version : v;
}
public void resetVersion(InetAddress endpoint)
{
logger.trace("Resetting version for {}", endpoint);
- Integer removed = versions.remove(endpoint);
- if (removed != null && removed <= VERSION_30)
- refreshAllNodeMinVersions();
- }
-
- private void refreshAllNodeMinVersions()
- {
- boolean anyNodeLowerThan30 = false;
- for (Integer version : versions.values())
- {
- if (version < MessagingService.VERSION_30)
- {
- anyNodeLowerThan30 = true;
- allNodesAtLeast30 = false;
- }
-
- if (version < MessagingService.VERSION_22)
- {
- allNodesAtLeast22 = false;
- return;
- }
- }
- allNodesAtLeast22 = true;
- allNodesAtLeast30 = !anyNodeLowerThan30;
+ versions.remove(endpoint);
}
public int getVersion(InetAddress endpoint)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1f47334..c32154e 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -336,11 +336,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
private void writeInternal(MessageOut message, int id, long timestamp) throws IOException
{
out.writeInt(MessagingService.PROTOCOL_MAGIC);
-
- if (targetVersion < MessagingService.VERSION_20)
- out.writeUTF(String.valueOf(id));
- else
- out.writeInt(id);
+ out.writeInt(id);
// int cast cuts off the high-order half of the timestamp, which we can assume remains
// the same between now and when the recipient reconstructs it.
@@ -427,9 +423,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
int maxTargetVersion = handshakeVersion(in);
if (maxTargetVersion == NO_VERSION)
{
- // no version is returned, so disconnect an try again: we will either get
- // a different target version (targetVersion < MessagingService.VERSION_12)
- // or if the same version the handshake will finally succeed
+ // no version is returned, so disconnect an try again
logger.trace("Target max version is {}; no version information yet, will retry", maxTargetVersion);
if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint()))
logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion);
@@ -461,22 +455,15 @@ public class OutboundTcpConnection extends FastThreadLocalThread
{
out.flush();
logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint());
- if (targetVersion < MessagingService.VERSION_21)
- {
- // Snappy is buffered, so no need for extra buffering output stream
- out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
- }
- else
- {
- // TODO: custom LZ4 OS that supports BB write methods
- LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
- Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
- out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
- 1 << 14, // 16k block size
- compressor,
- checksum,
- true)); // no async flushing
- }
+
+ // TODO: custom LZ4 OS that supports BB write methods
+ LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
+ Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
+ out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
+ 1 << 14, // 16k block size
+ compressor,
+ checksum,
+ true)); // no async flushing
}
logger.debug("Done connecting to {}", poolReference.endPoint());
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 05adbf9..be3daef 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -93,12 +93,10 @@ public class RepairJobDesc
{
public void serialize(RepairJobDesc desc, DataOutputPlus out, int version) throws IOException
{
- if (version >= MessagingService.VERSION_21)
- {
- out.writeBoolean(desc.parentSessionId != null);
- if (desc.parentSessionId != null)
- UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version);
- }
+ out.writeBoolean(desc.parentSessionId != null);
+ if (desc.parentSessionId != null)
+ UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version);
+
UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
out.writeUTF(desc.keyspace);
out.writeUTF(desc.columnFamily);
@@ -111,11 +109,8 @@ public class RepairJobDesc
public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException
{
UUID parentSessionId = null;
- if (version >= MessagingService.VERSION_21)
- {
- if (in.readBoolean())
- parentSessionId = UUIDSerializer.serializer.deserialize(in, version);
- }
+ if (in.readBoolean())
+ parentSessionId = UUIDSerializer.serializer.deserialize(in, version);
UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
@@ -136,13 +131,9 @@ public class RepairJobDesc
public long serializedSize(RepairJobDesc desc, int version)
{
- int size = 0;
- if (version >= MessagingService.VERSION_21)
- {
- size += TypeSizes.sizeof(desc.parentSessionId != null);
- if (desc.parentSessionId != null)
- size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version);
- }
+ int size = TypeSizes.sizeof(desc.parentSessionId != null);
+ if (desc.parentSessionId != null)
+ size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version);
size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
size += TypeSizes.sizeof(desc.keyspace);
size += TypeSizes.sizeof(desc.columnFamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index a2a2512..e20995e 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -218,7 +218,7 @@ public class Validator implements Runnable
validated++;
// MerkleTree uses XOR internally, so we want lots of output bits here
CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
- UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version);
+ UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version);
// only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
return digest.count > 0
? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)