You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/26 22:59:25 UTC
[2/3] git commit: Fix streaming older SSTable yields row tombstones
Fix streaming older SSTable yields row tombstones
patch by yukim; reviewed by jbellis for CASSANDRA-6527
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2111a20b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2111a20b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2111a20b
Branch: refs/heads/trunk
Commit: 2111a20b4b44e557357f81146ead6cf7493a8d31
Parents: 1365749
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Dec 26 15:52:03 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Dec 26 15:52:03 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableWriter.java | 11 +++--
.../cassandra/io/sstable/LegacySSTableTest.java | 49 +++++++++++++++++++-
3 files changed, 56 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f38b58f..93cdd81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
* cqlsh: handle symlinks properly (CASSANDRA-6425)
* Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
* Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
+ * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
Merged from 1.2:
* Improved error message on bad properties in DDL queries (CASSANDRA-6453)
* Randomize batchlog candidates selection (CASSANDRA-6481)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 70c0b42..3d19d83 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -222,16 +222,19 @@ public class SSTableWriter extends SSTable
StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ // skip row size for version < ja
+ if (version.hasRowSizeAndColumnCount)
+ FileUtils.skipBytesFully(in, 8);
+
cf.delete(DeletionTime.serializer.deserialize(in));
ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream);
+
+ // read column count for version < ja
int columnCount = Integer.MAX_VALUE;
if (version.hasRowSizeAndColumnCount)
- {
- // skip row size
- FileUtils.skipBytesFully(in, 8);
columnCount = in.readInt();
- }
+
Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, columnCount, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2111a20b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 0b0ecf8..e508a55 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -25,8 +25,15 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.BeforeClass;
@@ -85,6 +92,46 @@ public class LegacySSTableTest extends SchemaLoader
*/
@Test
+ public void testStreaming() throws Throwable
+ {
+ StorageService.instance.initServer();
+
+ for (File version : LEGACY_SSTABLE_ROOT.listFiles())
+ if (Descriptor.Version.validate(version.getName()))
+ testStreaming(version.getName());
+ }
+
+ private void testStreaming(String version) throws Exception
+ {
+ SSTableReader sstable = SSTableReader.open(getDescriptor(version));
+ IPartitioner p = StorageService.getPartitioner();
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
+ ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
+ ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
+ details.add(new StreamSession.SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
+ .execute().get();
+ sstable.close();
+
+ ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME);
+ assert cfs.getSSTables().size() == 1;
+ sstable = cfs.getSSTables().iterator().next();
+ for (String keystring : TEST_DATA)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(keystring);
+ SSTableNamesIterator iter = new SSTableNamesIterator(sstable, Util.dk(key), FBUtilities.singleton(key));
+ ColumnFamily cf = iter.getColumnFamily();
+
+ // check not deleted (CASSANDRA-6527)
+ assert cf.deletionInfo().equals(DeletionInfo.live());
+ assert iter.next().name().equals(key);
+ }
+ }
+
+ @Test
public void testVersions() throws Throwable
{
for (File version : LEGACY_SSTABLE_ROOT.listFiles())