You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:14 UTC

[23/23] cassandra git commit: Merge branch 'cassandra-3.9' into trunk

Merge branch 'cassandra-3.9' into trunk

* cassandra-3.9:
  Change commitlog and sstables to track dirty and clean intervals.
  Disable passing control to post-flush after flush failure to prevent data loss.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/624ed783
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/624ed783
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/624ed783

Branch: refs/heads/trunk
Commit: 624ed7838bafa96c2083d5a10ebe9ef44f12dcf8
Parents: 7fe4309 7b10217
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Aug 5 15:43:46 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:48:18 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  13 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  70 +---
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  21 +-
 .../AbstractCommitLogSegmentManager.java        |   4 +-
 .../cassandra/db/commitlog/CommitLog.java       |  11 +-
 .../db/commitlog/CommitLogReplayer.java         | 105 ++----
 .../db/commitlog/CommitLogSegment.java          |  82 +++--
 .../cassandra/db/commitlog/IntervalSet.java     | 192 +++++++++++
 .../compaction/AbstractCompactionStrategy.java  |   3 +
 .../compaction/CompactionStrategyManager.java   |   3 +
 .../apache/cassandra/db/lifecycle/Tracker.java  |  45 +--
 .../org/apache/cassandra/db/lifecycle/View.java |  35 +-
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../metadata/LegacyMetadataSerializer.java      |  17 +-
 .../io/sstable/metadata/MetadataCollector.java  |  38 +--
 .../io/sstable/metadata/StatsMetadata.java      |  44 +--
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++
 .../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
 .../legacy_mc_clust/mc-1-big-Data.db            | Bin 0 -> 5355 bytes
 .../legacy_mc_clust/mc-1-big-Digest.crc32       |   1 +
 .../legacy_mc_clust/mc-1-big-Filter.db          | Bin 0 -> 24 bytes
 .../legacy_mc_clust/mc-1-big-Index.db           | Bin 0 -> 157553 bytes
 .../legacy_mc_clust/mc-1-big-Statistics.db      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust/mc-1-big-Summary.db         | Bin 0 -> 47 bytes
 .../legacy_mc_clust/mc-1-big-TOC.txt            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_mc_clust_compact/mc-1-big-Data.db    | Bin 0 -> 5382 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_compact/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_compact/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_compact/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../legacy_mc_clust_counter/mc-1-big-Data.db    | Bin 0 -> 4631 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_counter/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_counter/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_counter/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 4625 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple/mc-1-big-Data.db           | Bin 0 -> 89 bytes
 .../legacy_mc_simple/mc-1-big-Digest.crc32      |   1 +
 .../legacy_mc_simple/mc-1-big-Filter.db         | Bin 0 -> 24 bytes
 .../legacy_mc_simple/mc-1-big-Index.db          | Bin 0 -> 26 bytes
 .../legacy_mc_simple/mc-1-big-Statistics.db     | Bin 0 -> 4639 bytes
 .../legacy_mc_simple/mc-1-big-Summary.db        | Bin 0 -> 47 bytes
 .../legacy_mc_simple/mc-1-big-TOC.txt           |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_compact/mc-1-big-Data.db   | Bin 0 -> 91 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_compact/mc-1-big-Index.db  | Bin 0 -> 26 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4680 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_compact/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_counter/mc-1-big-Data.db   | Bin 0 -> 110 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_counter/mc-1-big-Index.db  | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4648 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_counter/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4689 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../db/commitlog/CommitLogStressTest.java       |   3 +-
 test/unit/org/apache/cassandra/Util.java        |  20 ++
 .../org/apache/cassandra/cql3/CQLTester.java    |  12 +-
 .../apache/cassandra/cql3/OutOfSpaceTest.java   |  33 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 151 ++++++++-
 .../cassandra/db/compaction/NeverPurgeTest.java |   6 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  12 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../metadata/MetadataSerializerTest.java        |  16 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 .../cassandra/utils/IntegerIntervalsTest.java   | 326 +++++++++++++++++++
 98 files changed, 1223 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index eb9759e,e32c204..b6eea94
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -36,8 -38,8 +36,9 @@@ import org.apache.cassandra.db.Mutation
  import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
  import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.utils.CLibrary;
+ import org.apache.cassandra.utils.IntegerInterval;
  import org.apache.cassandra.utils.concurrent.OpOrder;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 2dccf3c,14e391b..6cc33f5
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@@ -21,8 -21,11 +21,9 @@@ import java.io.*
  import java.nio.ByteBuffer;
  import java.util.*;
  
 -import com.google.common.collect.Maps;
 -
  import org.apache.cassandra.db.TypeSizes;
  import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.format.Version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index eb9abcf,196cfbf..730f9f0
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -24,8 -23,8 +24,6 @@@ import java.util.EnumMap
  import java.util.List;
  import java.util.Map;
  
- import com.google.common.collect.Ordering;
 -import com.google.common.collect.Maps;
--
  import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
  import com.clearspring.analytics.stream.cardinality.ICardinality;
  import org.apache.cassandra.db.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index b455ad7,6686684..acad0c5
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -112,8 -112,7 +112,7 @@@ public class SSTableMetadataViewe
                      out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
                      out.printf("SSTable Level: %d%n", stats.sstableLevel);
                      out.printf("Repaired at: %d%n", stats.repairedAt);
-                     out.printf("Minimum replay position: %s%n", stats.commitLogLowerBound);
-                     out.printf("Maximum replay position: %s%n", stats.commitLogUpperBound);
 -                    out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
++                    out.printf("Replay positions covered: %s%n", stats.commitLogIntervals);
                      out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
                      out.printf("totalRows: %s%n", stats.totalRows);
                      out.println("Estimated tombstone drop times:");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 4bc5f6b,6ab7d46..30dffe5
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -277,8 -278,8 +278,8 @@@ public class CommitLogTes
  
          // "Flush": this won't delete anything
          UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
 -        CommitLog.instance.sync(true);
 +        CommitLog.instance.sync();
-         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
+         CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
          assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
@@@ -683,5 -693,137 +684,137 @@@
              }
          }
      }
+ 
+     public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+ 
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+         try
+         {
+             DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+ 
+             for (int i = 0 ; i < 5 ; i++)
+             {
+                 new RowUpdateBuilder(cfs.metadata, 0, "k")
+                     .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+                     .build()
+                     .apply();
+ 
+                 if (i == 2)
+                 {
+                     try (Closeable c = Util.markDirectoriesUnwriteable(cfs))
+                     {
+                         cfs.forceBlockingFlush();
+                     }
+                     catch (Throwable t)
+                     {
+                         // expected. Cause (after some wrappings) should be a write error
+                         while (!(t instanceof FSWriteError))
+                             t = t.getCause();
+                     }
+                 }
+                 else
+                     cfs.forceBlockingFlush();
+             }
+         }
+         finally
+         {
+             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+         }
+ 
 -        CommitLog.instance.sync(true);
++        CommitLog.instance.sync();
+         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
+         // If retries work subsequent flushes should clear up error and this should change to expect 0.
+         Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+     }
+ 
+     public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction)
+             throws ExecutionException, InterruptedException, IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+ 
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         for (int i = 0 ; i < 5 ; i++)
+         {
+             new RowUpdateBuilder(cfs.metadata, 0, "k")
+                 .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+                 .build()
+                 .apply();
+ 
+             Memtable current = cfs.getTracker().getView().getCurrentMemtable();
+             if (i == 2)
+                 current.makeUnflushable();
+ 
+             flushAction.accept(cfs, current);
+         }
+         if (performCompaction)
+             cfs.forceMajorCompaction();
+         // Make sure metadata saves and reads fine
+         for (SSTableReader reader : cfs.getLiveSSTables())
+             reader.reloadSSTableMetadata();
+ 
 -        CommitLog.instance.sync(true);
++        CommitLog.instance.sync();
+         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
+         // persisted all data in the commit log. Because we know there was an error, there must be something left to
+         // replay.
+         Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+     }
+ 
+     BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) ->
+     {
+         try
+         {
+             cfs.forceBlockingFlush();
+         }
+         catch (Throwable t)
+         {
+             // expected after makeUnflushable. Cause (after some wrappings) should be a write error
+             while (!(t instanceof FSWriteError))
+                 t = t.getCause();
+             // Wait for started flushes to complete.
+             cfs.switchMemtableIfCurrent(current);
+         }
+     };
+ 
+     BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) ->
+     {
+         // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain
+         // flushed data.
+         // This does not stop on errors and should retain segments for which flushing failed.
+         CommitLog.instance.forceRecycleAllSegments();
+ 
+         // Wait for started flushes to complete.
+         cfs.switchMemtableIfCurrent(current);
+     };
+ 
+     @Test
+     public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(flush, false);
+     }
+ 
+     @Test
+     public void testOutOfOrderLogDiscard() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(recycleSegments, false);
+     }
+ 
+     @Test
+     public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(flush, true);
+     }
+ 
+     @Test
+     public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(recycleSegments, true);
+     }
  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 62d0479,4bd4489..eb50c11
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@@ -29,9 -30,10 +29,10 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.db.SerializationHeader;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.SerializationHeader;
  import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
  import org.apache.cassandra.dht.RandomPartitioner;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624ed783/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index fe75da1,fe75da1..04be91a
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -134,7 -134,7 +134,7 @@@ public class StreamTransferTaskTes
  
          // create streaming task that streams those two sstables
          StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
--        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
++        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getLiveSSTables().size());
          for (SSTableReader sstable : cfs.getLiveSSTables())
          {
              List<Range<Token>> ranges = new ArrayList<>();