You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/06/02 10:50:54 UTC

[2/3] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e826951
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e826951
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e826951

Branch: refs/heads/cassandra-3.7
Commit: 1e82695115c7f191d4de60a92f7b9fd078ebbc68
Parents: 7eb4647 6c445d6
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:36:14 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:44:45 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  71 +++++++++-
 .../db/commitlog/CommitLogSegment.java          |   7 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  37 +++---
 .../db/commitlog/CompressedSegment.java         |   6 +-
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../db/RecoveryManagerFlushedTest.java          |  38 +++++-
 .../db/RecoveryManagerMissingHeaderTest.java    |  36 ++++-
 .../cassandra/db/RecoveryManagerTest.java       |  47 +++++--
 .../db/RecoveryManagerTruncateTest.java         |  42 +++++-
 .../db/commitlog/CommitLogDescriptorTest.java   | 102 ++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 132 +++++++------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   2 +-
 13 files changed, 389 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0cafa83,9752d16..70da4ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.2.7
 +3.0.7
 + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
+  * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
   * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
   * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c0e12c5,460ecfe..dcdd855
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -70,11 -70,10 +70,10 @@@ public class CommitLog implements Commi
      final CommitLogMetrics metrics;
      final AbstractCommitLogService executor;
  
-     final ICompressor compressor;
-     public ParameterizedClass compressorClass;
+     volatile Configuration configuration;
      final public String location;
  
 -    static private CommitLog construct()
 +    private static CommitLog construct()
      {
          CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
  
@@@ -433,6 -432,14 +431,14 @@@
      }
  
      /**
+      * FOR TESTING PURPOSES.
+      */
+     public void resetConfiguration()
+     {
 -        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++        configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+     }
+ 
+     /**
       * FOR TESTING PURPOSES.  See CommitLogAllocator
       */
      public int restartUnsafe() throws IOException
@@@ -487,4 -494,59 +493,59 @@@
                  throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
          }
      }
+ 
+     public static final class Configuration
+     {
+         /**
+          * The compressor class.
+          */
+         private final ParameterizedClass compressorClass;
+ 
+         /**
+          * The compressor used to compress the segments.
+          */
+         private final ICompressor compressor;
+ 
+         public Configuration(ParameterizedClass compressorClass)
+         {
+             this.compressorClass = compressorClass;
 -            this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
++            this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
+         }
+ 
+         /**
+          * Checks if the segments must be compressed.
+          * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+          */
+         public boolean useCompression()
+         {
+             return compressor != null;
+         }
+ 
+         /**
+          * Returns the compressor used to compress the segments.
+          * @return the compressor used to compress the segments
+          */
+         public ICompressor getCompressor()
+         {
+             return compressor;
+         }
+ 
+         /**
+          * Returns the compressor class.
+          * @return the compressor class
+          */
+         public ParameterizedClass getCompressorClass()
+         {
+             return compressorClass;
+         }
+ 
+         /**
+          * Returns the compressor name.
+          * @return the compressor name.
+          */
+         public String getCompressorName()
+         {
+             return useCompression() ? compressor.getClass().getSimpleName() : "none";
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index aedc9da,ba28f3e..27c05b4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -118,22 -117,12 +118,23 @@@ public abstract class CommitLogSegmen
      final CommitLog commitLog;
      public final CommitLogDescriptor descriptor;
  
 -    static CommitLogSegment createSegment(CommitLog commitLog)
 +    static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
      {
-         return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog);
 -        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
++        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
+                                                         : new MemoryMappedSegment(commitLog);
      }
  
 +    /**
 +     * Checks if the segments use a buffer pool.
 +     *
 +     * @param commitLog the commit log
 +     * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
 +     */
 +    static boolean usesBufferPool(CommitLog commitLog)
 +    {
-         return commitLog.compressor != null;
++        return commitLog.configuration.useCompression();
 +    }
 +
      static long getNextId()
      {
          return idBase + nextId.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 2ee4eed,8670fd7..66ad6a3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@@ -21,11 -21,11 +21,9 @@@ import java.io.File
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Collections;
--import java.util.HashSet;
  import java.util.LinkedHashMap;
  import java.util.List;
  import java.util.Map;
--import java.util.Set;
  import java.util.UUID;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.ConcurrentLinkedQueue;
@@@ -34,24 -34,25 +32,26 @@@ import java.util.concurrent.LinkedBlock
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
  
--import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
--import com.google.common.util.concurrent.*;
 -
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
  import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.Pair;
--import org.apache.cassandra.utils.concurrent.WaitQueue;
  import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
  
--import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Iterables;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.Runnables;
++import com.google.common.util.concurrent.Uninterruptibles;
  
  /**
   * Performs eager-creation of commit log segments in a background thread. All the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 0ec0bca,219709b..c73a30a
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -66,11 -58,10 +66,11 @@@ public class CompressedSegment extends 
      /**
       * Constructs a new segment file.
       */
 -    CompressedSegment(CommitLog commitLog)
 +    CompressedSegment(CommitLog commitLog, Runnable onClose)
      {
          super(commitLog);
-         this.compressor = commitLog.compressor;
+         this.compressor = commitLog.configuration.getCompressor();
 +        this.onClose = onClose;
          try
          {
              channel.write((ByteBuffer) buffer.duplicate().flip());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,0000000..d06c112
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -1,95 -1,0 +1,131 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.db;
 +
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
++
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.SchemaLoader;
- import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.SchemaKeyspace;
 +import org.apache.cassandra.utils.FBUtilities;
 +
++@RunWith(Parameterized.class)
 +public class RecoveryManagerFlushedTest
 +{
 +    private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
 +
 +    private static final String KEYSPACE1 = "RecoveryManager2Test";
 +    private static final String CF_STANDARD1 = "Standard1";
 +    private static final String CF_STANDARD2 = "Standard2";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
 +    }
 +
++    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][] {
++                { null }, // No compression
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++    }
++
 +    @Test
 +    /* test that commit logs do not replay flushed data */
 +    public void testWithFlush() throws Exception
 +    {
 +        // Flush everything that may be in the commit log now to start fresh
 +        FBUtilities.waitOnFutures(Keyspace.open(SystemKeyspace.NAME).flush());
 +        FBUtilities.waitOnFutures(Keyspace.open(SchemaKeyspace.NAME).flush());
 +
 +
 +        CompactionManager.instance.disableAutoCompaction();
 +
 +        // add a row to another CF so we test skipping mutations within a not-entirely-flushed CF
 +        insertRow("Standard2", "key");
 +
 +        for (int i = 0; i < 100; i++)
 +        {
 +            String key = "key" + i;
 +            insertRow("Standard1", key);
 +        }
 +
 +        Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace1.getColumnFamilyStore("Standard1");
 +        logger.debug("forcing flush");
 +        cfs.forceBlockingFlush();
 +
 +        logger.debug("begin manual replay");
 +        // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
 +        // will be replayed)
 +        int replayed = CommitLog.instance.resetUnsafe(false);
 +        assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
 +    }
 +
 +    private void insertRow(String cfname, String key)
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        new RowUpdateBuilder(cfs.metadata, 0, key)
 +            .clustering("c")
 +            .add("val", "val1")
 +            .build()
 +            .apply();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,0000000..8ac7c5d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -1,88 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.File;
 +import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
 +
 +import org.junit.Assert;
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
- import org.apache.cassandra.db.rows.UnfilteredRowIterator;
++import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +
++@RunWith(Parameterized.class)
 +public class RecoveryManagerMissingHeaderTest
 +{
 +    private static final String KEYSPACE1 = "RecoveryManager3Test1";
 +    private static final String CF_STANDARD1 = "Standard1";
 +
 +    private static final String KEYSPACE2 = "RecoveryManager3Test2";
 +    private static final String CF_STANDARD3 = "Standard3";
 +
++    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][] {
++                { null }, // No compression
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++    }
++
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
 +        SchemaLoader.createKeyspace(KEYSPACE2,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
 +    }
 +
 +    @Test
 +    public void testMissingHeader() throws IOException
 +    {
 +        Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +        Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 +
 +        DecoratedKey dk = Util.dk("keymulti");
 +        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
 +                                       .clustering("col1").add("val", "1")
 +                                       .build());
 +
 +        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
 +                                       .clustering("col1").add("val", "1")
 +                                       .build());
 +
 +        keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
 +        keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
 +
 +        // nuke the header
 +        for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
 +        {
 +            if (file.getName().endsWith(".header"))
 +                FileUtils.deleteWithConfirm(file);
 +        }
 +
 +        CommitLog.instance.resetUnsafe(false);
 +
 +        Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
 +        Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466,5676b99..397030a
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -22,35 -25,34 +25,41 @@@ import java.util.Collections
  import java.util.Date;
  import java.util.concurrent.TimeUnit;
  
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
- import org.apache.cassandra.Util;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.context.CounterContext;
- import org.apache.cassandra.exceptions.ConfigurationException;
- 
  import org.junit.Assert;
 +import org.junit.Before;
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
- import static org.junit.Assert.assertEquals;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 -import org.apache.cassandra.db.marshal.CounterColumnType;
++import org.apache.cassandra.db.context.CounterContext;
++import org.apache.cassandra.db.rows.Row;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
- @RunWith(OrderedJUnit4ClassRunner.class)
 -import static org.apache.cassandra.Util.cellname;
 -import static org.apache.cassandra.Util.column;
 -import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++import static org.junit.Assert.assertEquals;
+ 
+ @RunWith(Parameterized.class)
  public class RecoveryManagerTest
  {
 +    private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
 +
      private static final String KEYSPACE1 = "RecoveryManagerTest1";
      private static final String CF_STANDARD1 = "Standard1";
      private static final String CF_COUNTER1 = "Counter1";
@@@ -71,13 -75,20 +80,27 @@@
                                      SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
      }
  
 +    @Before
 +    public void clearData()
 +    {
 +        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
 +        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
 +        Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
 +    }
+     public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
  
      @Test
      public void testNothingToRecover() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,769316f..5a59f1c
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -18,18 -18,33 +18,30 @@@
  */
  package org.apache.cassandra.db;
  
 -import static org.apache.cassandra.Util.column;
 -import static org.junit.Assert.*;
 -
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ 
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +
- import org.junit.BeforeClass;
- import org.junit.Test;
- 
- import static org.junit.Assert.*;
++import static org.junit.Assert.assertTrue;
  
  /**
   * Test for the truncate operation.
@@@ -38,7 -54,29 +51,28 @@@ public class RecoveryManagerTruncateTes
  {
      private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
      private static final String CF_STANDARD1 = "Standard1";
 -    private static final String CF_STANDARD2 = "Standard2";
  
+     public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 0000000,8d63959..898c19f
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -1,0 -1,103 +1,102 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+ 
+ import com.google.common.collect.ImmutableMap;
+ 
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.util.ByteBufferDataInput;
 -import org.apache.cassandra.io.util.FileDataInput;
++import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ public class CommitLogDescriptorTest
+ {
+     @Test
+     public void testVersions()
+     {
+         assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+         assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+ 
+         assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+ 
+         assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+         assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+     }
+ 
+     private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+     {
+         ByteBuffer buf = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buf, desc);
 -        long length = buf.position();
+         // Put some extra data in the stream.
+         buf.putDouble(0.1);
+         buf.flip();
 -        try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
++
++        try (DataInputBuffer input = new DataInputBuffer(buf, false))
+         {
+             CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
 -            assertEquals("Descriptor length", length, input.getFilePointer());
+             assertEquals("Descriptors", desc, read);
+         }
+     }
+ 
+     @Test
+     public void testDescriptorPersistence() throws IOException
+     {
+         testDescriptorPersistence(new CommitLogDescriptor(11, null));
+         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+                 new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+     }
+ 
+     @Test
+     public void testDescriptorInvalidParametersSize() throws IOException
+     {
 -        Map<String, String> params = new HashMap<>();
 -        for (int i=0; i<6000; ++i)
++        final int numberOfParameters = 65535;
++        Map<String, String> params = new HashMap<>(numberOfParameters);
++        for (int i=0; i<numberOfParameters; ++i)
+             params.put("key"+i, Integer.toString(i, 16));
+         try {
 -            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
++            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+                                                                21,
+                                                                new ParameterizedClass("LZ4Compressor", params));
+             ByteBuffer buf = ByteBuffer.allocate(1024000);
+             CommitLogDescriptor.writeHeader(buf, desc);
+             fail("Parameter object too long should fail on writing descriptor.");
+         } catch (ConfigurationException e)
+         {
+             // correct path
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 555cdda,9999b42..39ba886
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -16,23 -16,15 +16,20 @@@
  * specific language governing permissions and limitations
  * under the License.
  */
 -
  package org.apache.cassandra.db.commitlog;
  
- import static junit.framework.Assert.assertTrue;
- import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
- import static org.junit.Assert.assertEquals;
- 
 -import java.io.*;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
  import java.nio.ByteBuffer;
- import java.util.HashMap;
- import java.util.Map;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  import java.util.UUID;
 +import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
@@@ -50,31 -39,54 +44,60 @@@ import org.junit.runners.Parameterized.
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
- import org.apache.cassandra.config.Config.CommitFailurePolicy;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.commitlog.CommitLogSegment;
 -import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.ColumnFamilyStore;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.RowUpdateBuilder;
  import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.composites.CellNameType;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.marshal.BytesType;
  import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.KillerForTests;
 +import org.apache.cassandra.utils.vint.VIntCoding;
  
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
+ 
+ @RunWith(Parameterized.class)
  public class CommitLogTest
  {
      private static final String KEYSPACE1 = "CommitLogTest";
      private static final String KEYSPACE2 = "CommitLogTestNonDurable";
 -    private static final String CF1 = "Standard1";
 -    private static final String CF2 = "Standard2";
 +    private static final String STANDARD1 = "Standard1";
 +    private static final String STANDARD2 = "Standard2";
  
+     public CommitLogTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {
@@@ -186,29 -208,21 +209,28 @@@
      @Test
      public void testDontDeleteIfDirty() throws Exception
      {
-         CommitLog.instance.resetUnsafe(true);
 +        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
          // Roughly 32 MB mutation
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
 +        Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
 +                     .clustering("bytes")
 +                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
 +                     .build();
  
          // Adding it 5 times
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
  
          // Adding new mutation on another CF
 -        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
 -        CommitLog.instance.add(rm2);
 +        Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(4))
 +                      .build();
 +        CommitLog.instance.add(m2);
  
          assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
  
@@@ -222,16 -236,10 +244,14 @@@
      @Test
      public void testDeleteIfNotDirty() throws Exception
      {
--        DatabaseDescriptor.getCommitLogSegmentSize();
-         CommitLog.instance.resetUnsafe(true);
 +        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
          // Roughly 32 MB mutation
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
 +        Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
 +                      .build();
  
          // Adding it twice (won't change segment)
          CommitLog.instance.add(rm);
@@@ -302,12 -294,8 +322,12 @@@
      @Test
      public void testEqualRecordLimit() throws Exception
      {
-         CommitLog.instance.resetUnsafe(true);
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
 +                      .build();
++
          CommitLog.instance.add(rm);
      }
  
@@@ -448,129 -432,57 +468,69 @@@
      }
  
      @Test
-     public void testVersions()
-     {
-         Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-         Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
- 
-         assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
- 
-         assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-         assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
-     }
- 
-     @Test
      public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
      {
 -        boolean prev = DatabaseDescriptor.isAutoSnapshot();
 -        DatabaseDescriptor.setAutoSnapshot(false);
 -        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
 -        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
 -
 -        final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
 -        rm1.apply();
 -        cfs1.truncateBlocking();
 -        DatabaseDescriptor.setAutoSnapshot(prev);
 -        final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
 -
 -        for (int i = 0 ; i < 5 ; i++)
 -            CommitLog.instance.add(rm2);
 -
 -        Assert.assertEquals(2, CommitLog.instance.activeSegments());
 -        ReplayPosition position = CommitLog.instance.getContext();
 -        for (Keyspace ks : Keyspace.system())
 -            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 -                CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
 -        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
 -        Assert.assertEquals(1, CommitLog.instance.activeSegments());
 +        boolean originalState = DatabaseDescriptor.isAutoSnapshot();
 +        try
 +        {
 +            CommitLog.instance.resetUnsafe(true);
 +            boolean prev = DatabaseDescriptor.isAutoSnapshot();
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +            ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
 +            new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
 +            cfs1.truncateBlocking();
 +            DatabaseDescriptor.setAutoSnapshot(prev);
 +            Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
 +                          .clustering("bytes")
 +                          .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
 +                          .build();
 +
 +            for (int i = 0 ; i < 5 ; i++)
 +                CommitLog.instance.add(m2);
 +
 +            assertEquals(2, CommitLog.instance.activeSegments());
 +            ReplayPosition position = CommitLog.instance.getContext();
 +            for (Keyspace ks : Keyspace.system())
 +                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 +                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
 +            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
 +            assertEquals(1, CommitLog.instance.activeSegments());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
      }
  
      @Test
      public void testTruncateWithoutSnapshotNonDurable() throws IOException
      {
-         CommitLog.instance.resetUnsafe(true);
 -        boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
 -        DatabaseDescriptor.setAutoSnapshot(false);
 -        Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
 -        Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
 -        ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
 -        CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
 -        Mutation rm;
 -        DecoratedKey dk = Util.dk("key1");
 -
 -        // add data
 -        rm = new Mutation(KEYSPACE2, dk.getKey());
 -        rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
 -        rm.apply();
 -
 -        ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
 -        Row row = command.getRow(notDurableKs);
 -        Cell col = row.cf.getColumn(Util.cellname("Column1"));
 -        Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
 -        cfs.truncateBlocking();
 -        DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
 -        row = command.getRow(notDurableKs);
 -        Assert.assertEquals(null, row.cf);
 +        boolean originalState = DatabaseDescriptor.getAutoSnapshot();
 +        try
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
 +            Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
 +
 +            ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
 +            new RowUpdateBuilder(cfs.metadata, 0, "key1")
 +                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .applyUnsafe();
 +
 +            assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
 +                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
 +
 +            cfs.truncateBlocking();
 +
 +            Util.assertEmpty(Util.cmd(cfs).columns("val").build());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
      }
- 
-     private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
-     {
-         ByteBuffer buf = ByteBuffer.allocate(1024);
-         CommitLogDescriptor.writeHeader(buf, desc);
-         // Put some extra data in the stream.
-         buf.putDouble(0.1);
-         buf.flip();
- 
-         DataInputBuffer input = new DataInputBuffer(buf, false);
-         CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-         Assert.assertEquals("Descriptors", desc, read);
-     }
- 
-     @Test
-     public void testDescriptorPersistence() throws IOException
-     {
-         testDescriptorPersistence(new CommitLogDescriptor(11, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
-                 new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
-     }
- 
-     @Test
-     public void testDescriptorInvalidParametersSize() throws IOException
-     {
-         Map<String, String> params = new HashMap<>();
-         for (int i=0; i<65535; ++i)
-             params.put("key"+i, Integer.toString(i, 16));
-         try {
-             CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
-                                                                21,
-                                                                new ParameterizedClass("LZ4Compressor", params));
-             ByteBuffer buf = ByteBuffer.allocate(1024000);
-             CommitLogDescriptor.writeHeader(buf, desc);
-             Assert.fail("Parameter object too long should fail on writing descriptor.");
-         } catch (ConfigurationException e)
-         {
-             // correct path
-         }
-     }
  }
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------