You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/04 11:12:28 UTC

cassandra git commit: Fix startup error when upgrading nodes to 3.0

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 dc5b268d0 -> f009272ba


Fix startup error when upgrading nodes to 3.0

patch by slebresne; reviewed by blambov for CASSANDRA-10136


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f009272b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f009272b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f009272b

Branch: refs/heads/cassandra-3.0
Commit: f009272ba6ffb94579a443733f5cfe25ec9606fa
Parents: dc5b268
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 19 16:13:52 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 4 11:11:42 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/Serializers.java    | 65 ++++++++++++++++++--
 .../db/SinglePartitionNamesCommand.java         |  2 +-
 .../columniterator/AbstractSSTableIterator.java | 36 +++++++++--
 .../db/columniterator/SSTableIterator.java      |  4 +-
 .../columniterator/SSTableReversedIterator.java | 35 +++++++++--
 .../cassandra/db/marshal/CompositeType.java     |  5 ++
 .../cassandra/io/sstable/IndexHelper.java       |  6 +-
 8 files changed, 133 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb33de0..726eb04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Fix startup error when upgrading nodes (CASSANDRA-10136)
  * Base table PRIMARY KEY can be assumed to be NOT NULL in MV creation (CASSANDRA-10147)
  * Improve batchlog write patch (CASSANDRA-9673)
  * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 8056222..2561bbe 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -18,12 +18,16 @@
 package org.apache.cassandra.db;
 
 import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 
@@ -44,12 +48,65 @@ public class Serializers
         return new IndexInfo.Serializer(metadata, version);
     }
 
-    // Note that for the old layout, this will actually discard the cellname parts that are not strictly
-    // part of the clustering prefix. Don't use this if that's not what you want.
-    public ISerializer<ClusteringPrefix> clusteringPrefixSerializer(final Version version, final SerializationHeader header)
+    // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this method and inline the calls to
+    // ClusteringPrefix.serializer in IndexHelper directly. At which point this whole class probably becomes
+    // unecessary (since IndexInfo.Serializer won't depend on the metadata either).
+    public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header)
     {
         if (!version.storeRows())
-            throw new UnsupportedOperationException();
+        {
+            return new ISerializer<ClusteringPrefix>()
+            {
+                public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
+                {
+                    // We should only use this for reading old sstable, never write new ones.
+                    throw new UnsupportedOperationException();
+                }
+
+                public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+                {
+                    // We're reading the old cellname/composite
+                    ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
+                    assert bb.hasRemaining(); // empty cellnames were invalid
+
+                    int clusteringSize = metadata.clusteringColumns().size();
+                    // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
+                    if (clusteringSize == 0)
+                        return Clustering.EMPTY;
+
+                    if (!metadata.isCompound())
+                        return new Clustering(bb);
+
+                    List<ByteBuffer> components = CompositeType.splitName(bb);
+                    byte eoc = CompositeType.lastEOC(bb);
+
+                    if (eoc == 0 || components.size() >= clusteringSize)
+                    {
+                        // That's a clustering.
+                        if (components.size() > clusteringSize)
+                            components = components.subList(0, clusteringSize);
+
+                        return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+                    }
+                    else
+                    {
+                        // It's a range tombstone bound. It is a start since that's the only part we've ever included
+                        // in the index entries.
+                        Slice.Bound.Kind boundKind = eoc > 0
+                                                   ? Slice.Bound.Kind.EXCL_START_BOUND
+                                                   : Slice.Bound.Kind.INCL_START_BOUND;
+
+                        return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
+                    }
+                }
+
+                public long serializedSize(ClusteringPrefix clustering)
+                {
+                    // We should only use this for reading old sstable, never write new ones.
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
 
         return new ISerializer<ClusteringPrefix>()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index cee3fc4..1b41005 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -192,7 +192,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
         if (result == null)
             return ImmutableBTreePartition.create(iter, maxRows);
 
-        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, false)), nowInSec()))
+        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed())), nowInSec()))
         {
             return ImmutableBTreePartition.create(merged, maxRows);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index cf4bff7..87a57c6 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -22,9 +22,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -453,6 +450,32 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             return indexes.size();
         }
 
+        // Update the block idx based on the current reader position if we're past the current block.
+        public void updateBlock() throws IOException
+        {
+            assert currentIndexIdx >= 0;
+            while (currentIndexIdx + 1 < indexes.size() && isPastCurrentBlock())
+            {
+                reader.openMarker = currentIndex().endOpenMarker;
+                ++currentIndexIdx;
+
+                // We have to set the mark, and we have to set it at the beginning of the block. So if we're not at the beginning of the block, this forces us to a weird seek dance.
+                // This can only happen when reading old file however.
+                long startOfBlock = indexEntry.position + indexes.get(currentIndexIdx).offset;
+                long currentFilePointer = reader.file.getFilePointer();
+                if (startOfBlock == currentFilePointer)
+                {
+                    mark = reader.file.mark();
+                }
+                else
+                {
+                    reader.seekToPosition(startOfBlock);
+                    mark = reader.file.mark();
+                    reader.seekToPosition(currentFilePointer);
+                }
+            }
+        }
+
         // Check if we've crossed an index boundary (based on the mark on the beginning of the index block).
         public boolean isPastCurrentBlock()
         {
@@ -466,7 +489,12 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 
         public IndexHelper.IndexInfo currentIndex()
         {
-            return indexes.get(currentIndexIdx);
+            return index(currentIndexIdx);
+        }
+
+        public IndexHelper.IndexInfo index(int i)
+        {
+            return indexes.get(i);
         }
 
         // Finds the index of the first block containing the provided bound, starting at the provided index.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index e985c18..3536d65 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -265,9 +265,7 @@ public class SSTableIterator extends AbstractSSTableIterator
         protected Unfiltered computeNext() throws IOException
         {
             // Our previous read might have made us cross an index block boundary. If so, update our informations.
-            int currentBlockIdx = indexState.currentBlockIdx();
-            if (indexState.isPastCurrentBlock() && currentBlockIdx + 1 < indexState.blocksCount())
-                indexState.setToBlock(currentBlockIdx + 1);
+            indexState.updateBlock();
 
             // Return the next unfiltered unless we've reached the end, or we're beyond our slice
             // end (note that unless we're on the last block for the slice, there is no point

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index a5a1938..06855e3 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -116,7 +116,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
                 // Note that we can reuse that buffer between slices (we could alternatively re-read from disk
                 // every time, but that feels more wasteful) so we want to include everything from the beginning.
                 // We can stop at the slice end however since any following slice will be before that.
-                loadFromDisk(null, slice.end());
+                loadFromDisk(null, slice.end(), true);
             }
             setIterator(slice);
         }
@@ -150,15 +150,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
 
         // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition
         // is fully read, or when stopReadingDisk() returns true.
-        protected void loadFromDisk(Slice.Bound start, Slice.Bound end) throws IOException
+        protected void loadFromDisk(Slice.Bound start, Slice.Bound end, boolean includeFirst) throws IOException
         {
             buffer.reset();
 
+            boolean isFirst = true;
+
             // If the start might be in this block, skip everything that comes before it.
             if (start != null)
             {
                 while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk())
                 {
+                    isFirst = false;
                     if (deserializer.nextIsRow())
                         deserializer.skipNext();
                     else
@@ -179,7 +182,10 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
                    && !stopReadingDisk())
             {
                 Unfiltered unfiltered = deserializer.readNext();
-                buffer.add(unfiltered);
+                if (!isFirst || includeFirst)
+                    buffer.add(unfiltered);
+
+                isFirst = false;
 
                 if (unfiltered.isRangeTombstoneMarker())
                     updateOpenMarker((RangeTombstoneMarker)unfiltered);
@@ -224,7 +230,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
             this.slice = slice;
             isInit = true;
 
-            // if our previous slicing already got us pas the beginning of the sstable, we're done
+            // if our previous slicing already got us past the beginning of the sstable, we're done
             if (indexState.isDone())
             {
                 iterator = Collections.emptyIterator();
@@ -293,8 +299,25 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
             if (buffer == null)
                 buffer = createBuffer(indexState.blocksCount());
 
-            boolean canIncludeSliceStart = indexState.currentBlockIdx() == lastBlockIdx;
-            loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null);
+            int currentBlock = indexState.currentBlockIdx();
+
+            boolean canIncludeSliceStart = currentBlock == lastBlockIdx;
+
+            // When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can
+            // start at the end of a block and end at the beginning of the next one. That's not a problem per se for
+            // UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index
+            // blocks, but as we reading index block in reverse we must be careful to not read the end of the row at
+            // beginning of a block before we're reading the beginning of that row. So what we do is that if we detect
+            // that the row starting this block is also the row ending the previous one, we skip that first result and
+            // let it be read when we'll read the previous block.
+            boolean includeFirst = true;
+            if (!sstable.descriptor.version.storeRows() && currentBlock > 0)
+            {
+                ClusteringPrefix lastOfPrevious = indexState.index(currentBlock - 1).lastName;
+                ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName;
+                includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent) != 0;
+            }
+            loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, includeFirst);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 633a994..9a00c70 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -200,6 +200,11 @@ public class CompositeType extends AbstractCompositeType
         return l;
     }
 
+    public static byte lastEOC(ByteBuffer name)
+    {
+        return name.get(name.limit() - 1);
+    }
+
     // Extract component idx from bb. Return null if there is not enough component.
     public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f009272b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index b57724a..4dabe69 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -152,7 +152,7 @@ public class IndexHelper
 
             public void serialize(IndexInfo info, DataOutputPlus out, SerializationHeader header) throws IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
                 clusteringSerializer.serialize(info.firstName, out);
                 clusteringSerializer.serialize(info.lastName, out);
                 out.writeLong(info.offset);
@@ -168,7 +168,7 @@ public class IndexHelper
 
             public IndexInfo deserialize(DataInputPlus in, SerializationHeader header) throws IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
 
                 ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
                 ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
@@ -183,7 +183,7 @@ public class IndexHelper
 
             public long serializedSize(IndexInfo info, SerializationHeader header)
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
                 long size = clusteringSerializer.serializedSize(info.firstName)
                           + clusteringSerializer.serializedSize(info.lastName)
                           + TypeSizes.sizeof(info.offset)