You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/10 15:07:39 UTC

[04/11] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0918ba01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0918ba01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0918ba01

Branch: refs/heads/trunk
Commit: 0918ba01176392deadf8655a61dad44979a49ee5
Parents: 6a449b8 3539a07
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 7 12:27:08 2018 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 7 12:27:08 2018 +0000

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  8 ++--
 .../compaction/AbstractCompactionStrategy.java  |  5 ++-
 .../compaction/CompactionStrategyManager.java   | 14 +++---
 .../db/lifecycle/LifecycleNewTracker.java       | 47 ++++++++++++++++++++
 .../db/lifecycle/LifecycleTransaction.java      |  7 ++-
 .../apache/cassandra/db/lifecycle/LogFile.java  | 24 ++++------
 .../cassandra/db/lifecycle/LogTransaction.java  |  2 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    | 16 +++----
 .../sstable/format/RangeAwareSSTableWriter.java | 12 ++---
 .../io/sstable/format/SSTableWriter.java        | 24 +++++-----
 .../io/sstable/format/big/BigFormat.java        |  6 +--
 .../io/sstable/format/big/BigTableWriter.java   |  6 +--
 .../cassandra/streaming/StreamReader.java       |  6 +--
 .../cassandra/streaming/StreamReceiveTask.java  | 35 ++++++++++++++-
 .../cassandra/streaming/StreamSession.java      |  4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  5 ++-
 16 files changed, 150 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 13f0280,c455c4c..700c1cc
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -507,15 -475,15 +507,15 @@@ public class ColumnFamilyStore implemen
          return directories;
      }
  
-     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
      {
          MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-         return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+         return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
      }
  
-     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
      {
-         return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
 -        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, lifecycleNewTracker);
++        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker);
      }
  
      public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 8454147,9f07691..3d7800d
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -26,7 -26,8 +26,8 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.index.Index;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@@ -571,20 -514,9 +572,20 @@@ public abstract class AbstractCompactio
          return groupedSSTables;
      }
  
 -    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
 +    public CompactionLogger.Strategy strategyLogger()
 +    {
 +        return CompactionLogger.Strategy.none;
 +    }
 +
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
 +                                                       long keyCount,
 +                                                       long repairedAt,
 +                                                       MetadataCollector meta,
 +                                                       SerializationHeader header,
 +                                                       Collection<Index> indexes,
-                                                        LifecycleTransaction txn)
++                                                       LifecycleNewTracker lifecycleNewTracker)
      {
-         return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
 -        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, lifecycleNewTracker);
++        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, lifecycleNewTracker);
      }
  
      public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a50f428,1d3d18c..86170a1
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -20,30 -20,20 +20,30 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  import java.util.concurrent.Callable;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
- 
- import org.apache.cassandra.db.DiskBoundaries;
- import org.apache.cassandra.db.Memtable;
- import org.apache.cassandra.index.Index;
 +import com.google.common.primitives.Ints;
  
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Directories;
++import org.apache.cassandra.db.DiskBoundaries;
+ import org.apache.cassandra.db.Memtable;
  import org.apache.cassandra.db.SerializationHeader;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
++import org.apache.cassandra.index.Index;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
@@@ -1010,43 -490,15 +1010,43 @@@ public class CompactionStrategyManager 
          return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
      }
  
 -    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
 +                                                       long keyCount,
 +                                                       long repairedAt,
 +                                                       MetadataCollector collector,
 +                                                       SerializationHeader header,
 +                                                       Collection<Index> indexes,
-                                                        LifecycleTransaction txn)
++                                                       LifecycleNewTracker lifecycleNewTracker)
      {
 -        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +        maybeReloadDiskBoundaries();
 +        readLock.lock();
 +        try
          {
 -            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
 +            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +            {
-                 return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
++                return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
 +            }
 +            else
 +            {
-                 return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
++                return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
 +            }
          }
 -        else
 +        finally
 +        {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    public boolean isRepaired(AbstractCompactionStrategy strategy)
 +    {
 +        readLock.lock();
 +        try
 +        {
 +            return repaired.contains(strategy);
 +        }
 +        finally
          {
 -            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
 +            readLock.unlock();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2217ae2,ded070e..76e4dbb
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@@ -25,9 -25,8 +25,9 @@@ import java.util.UUID
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.index.Index;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@@ -35,11 -34,9 +35,11 @@@
  public class SimpleSSTableMultiWriter implements SSTableMultiWriter
  {
      private final SSTableWriter writer;
-     private final LifecycleTransaction txn;
++    private final LifecycleNewTracker lifecycleNewTracker;
  
-     protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
 -    protected SimpleSSTableMultiWriter(SSTableWriter writer)
++    protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker)
      {
-         this.txn = txn;
++        this.lifecycleNewTracker = lifecycleNewTracker;
          this.writer = writer;
      }
  
@@@ -92,7 -89,6 +92,7 @@@
  
      public Throwable abort(Throwable accumulate)
      {
-         txn.untrackNew(writer);
++        lifecycleNewTracker.untrackNew(writer);
          return writer.abort(accumulate);
      }
  
@@@ -113,10 -109,9 +113,10 @@@
                                              CFMetaData cfm,
                                              MetadataCollector metadataCollector,
                                              SerializationHeader header,
 +                                            Collection<Index> indexes,
-                                             LifecycleTransaction txn)
+                                             LifecycleNewTracker lifecycleNewTracker)
      {
-         SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
-         return new SimpleSSTableMultiWriter(writer, txn);
 -        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, lifecycleNewTracker);
 -        return new SimpleSSTableMultiWriter(writer);
++        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, lifecycleNewTracker);
++        return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 353aacb,0000000..3358225
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -1,208 -1,0 +1,208 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.DiskBoundaries;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class RangeAwareSSTableWriter implements SSTableMultiWriter
 +{
 +    private final List<PartitionPosition> boundaries;
 +    private final List<Directories.DataDirectory> directories;
 +    private final int sstableLevel;
 +    private final long estimatedKeys;
 +    private final long repairedAt;
 +    private final SSTableFormat.Type format;
 +    private final SerializationHeader header;
-     private final LifecycleTransaction txn;
++    private final LifecycleNewTracker lifecycleNewTracker;
 +    private int currentIndex = -1;
 +    public final ColumnFamilyStore cfs;
 +    private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
 +    private final List<SSTableReader> finishedReaders = new ArrayList<>();
 +    private SSTableMultiWriter currentWriter = null;
 +
-     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
++    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException
 +    {
 +        DiskBoundaries db = cfs.getDiskBoundaries();
 +        directories = db.directories;
 +        this.sstableLevel = sstableLevel;
 +        this.cfs = cfs;
 +        this.estimatedKeys = estimatedKeys / directories.size();
 +        this.repairedAt = repairedAt;
 +        this.format = format;
-         this.txn = txn;
++        this.lifecycleNewTracker = lifecycleNewTracker;
 +        this.header = header;
 +        boundaries = db.positions;
 +        if (boundaries == null)
 +        {
 +            Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
 +            if (localDir == null)
 +                throw new IOException(String.format("Insufficient disk space to store %s",
 +                                                    FBUtilities.prettyPrintMemory(totalSize)));
 +            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker);
 +        }
 +    }
 +
 +    private void maybeSwitchWriter(DecoratedKey key)
 +    {
 +        if (boundaries == null)
 +            return;
 +
 +        boolean switched = false;
 +        while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
 +        {
 +            switched = true;
 +            currentIndex++;
 +        }
 +
 +        if (switched)
 +        {
 +            if (currentWriter != null)
 +                finishedWriters.add(currentWriter);
 +
 +            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker);
 +        }
 +    }
 +
 +    public boolean append(UnfilteredRowIterator partition)
 +    {
 +        maybeSwitchWriter(partition.partitionKey());
 +        return currentWriter.append(partition);
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +        {
 +            if (writer.getFilePointer() > 0)
 +                finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
 +            else
 +                SSTableMultiWriter.abortOrDie(writer);
 +        }
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(boolean openResult)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +        {
 +            if (writer.getFilePointer() > 0)
 +                finishedReaders.addAll(writer.finish(openResult));
 +            else
 +                SSTableMultiWriter.abortOrDie(writer);
 +        }
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finished()
 +    {
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public SSTableMultiWriter setOpenResult(boolean openResult)
 +    {
 +        finishedWriters.forEach((w) -> w.setOpenResult(openResult));
 +        currentWriter.setOpenResult(openResult);
 +        return this;
 +    }
 +
 +    public String getFilename()
 +    {
 +        return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
 +    }
 +
 +    @Override
 +    public long getFilePointer()
 +    {
 +        return currentWriter.getFilePointer();
 +    }
 +
 +    @Override
 +    public UUID getCfId()
 +    {
 +        return currentWriter.getCfId();
 +    }
 +
 +    @Override
 +    public Throwable commit(Throwable accumulate)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +            accumulate = writer.commit(accumulate);
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public Throwable abort(Throwable accumulate)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter finishedWriter : finishedWriters)
 +            accumulate = finishedWriter.abort(accumulate);
 +
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public void prepareToCommit()
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
 +    }
 +
 +    @Override
 +    public void close()
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        finishedWriters.forEach(SSTableMultiWriter::close);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 9fb5f7c,fcc23a2..e320f30
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -29,11 -30,8 +29,11 @@@ import org.apache.cassandra.config.Data
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.compaction.OperationType;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTable;
@@@ -95,23 -90,16 +95,23 @@@ public abstract class SSTableWriter ext
                                         CFMetaData metadata,
                                         MetadataCollector metadataCollector,
                                         SerializationHeader header,
 +                                       Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker lifecycleNewTracker)
      {
          Factory writerFactory = descriptor.getFormat().getWriterFactory();
-         return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
 -        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
++        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker);
      }
  
 -    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
 +    public static SSTableWriter create(Descriptor descriptor,
 +                                       long keyCount,
 +                                       long repairedAt,
 +                                       int sstableLevel,
 +                                       SerializationHeader header,
 +                                       Collection<Index> indexes,
-                                        LifecycleTransaction txn)
++                                       LifecycleNewTracker lifecycleNewTracker)
      {
          CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-         return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
 -        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
++        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
      }
  
      public static SSTableWriter create(CFMetaData metadata,
@@@ -120,34 -108,21 +120,34 @@@
                                         long repairedAt,
                                         int sstableLevel,
                                         SerializationHeader header,
 +                                       Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker lifecycleNewTracker)
      {
          MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-         return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
 -        return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker);
++        return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, lifecycleNewTracker);
      }
  
 -    public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
 +    public static SSTableWriter create(String filename,
 +                                       long keyCount,
 +                                       long repairedAt,
 +                                       int sstableLevel,
 +                                       SerializationHeader header,
 +                                       Collection<Index> indexes,
-                                        LifecycleTransaction txn)
++                                       LifecycleNewTracker lifecycleNewTracker)
      {
-         return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn);
 -        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
++        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
      }
  
      @VisibleForTesting
 -    public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
 +    public static SSTableWriter create(String filename,
 +                                       long keyCount,
 +                                       long repairedAt,
 +                                       SerializationHeader header,
 +                                       Collection<Index> indexes,
-                                        LifecycleTransaction txn)
++                                       LifecycleNewTracker lifecycleNewTracker)
      {
 -        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker);
 +        Descriptor descriptor = Descriptor.fromFilename(filename);
-         return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
++        return create(descriptor, keyCount, repairedAt, 0, header, indexes, lifecycleNewTracker);
      }
  
      private static Set<Component> components(CFMetaData metadata)
@@@ -344,7 -285,6 +344,7 @@@
                                             CFMetaData metadata,
                                             MetadataCollector metadataCollector,
                                             SerializationHeader header,
 +                                           Collection<SSTableFlushObserver> observers,
-                                            LifecycleTransaction txn);
+                                            LifecycleNewTracker lifecycleNewTracker);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index b62cb11,360ef8a..9af7dc0
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -23,10 -22,13 +23,10 @@@ import java.util.Set
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.format.SSTableFormat;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 -import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.sstable.format.*;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
  import org.apache.cassandra.net.MessagingService;
@@@ -86,10 -88,9 +86,10 @@@ public class BigFormat implements SSTab
                                    CFMetaData metadata,
                                    MetadataCollector metadataCollector,
                                    SerializationHeader header,
 +                                  Collection<SSTableFlushObserver> observers,
-                                   LifecycleTransaction txn)
+                                   LifecycleNewTracker lifecycleNewTracker)
          {
-             return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
 -            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
++            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, lifecycleNewTracker);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c3139a3,f733619..9083cd3
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -17,31 -17,23 +17,31 @@@
   */
  package org.apache.cassandra.io.sstable.format.big;
  
 -import java.io.*;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collection;
  import java.util.Map;
 -
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.transform.Transformation;
 -import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import java.util.Optional;
  
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.cache.ChunkCache;
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
  import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@@ -60,25 -56,17 +60,25 @@@ public class BigTableWriter extends SST
      protected final SequentialWriter dataFile;
      private DecoratedKey lastWrittenKey;
      private DataPosition dataMark;
 -
 -    public BigTableWriter(Descriptor descriptor, 
 -                          Long keyCount, 
 -                          Long repairedAt, 
 -                          CFMetaData metadata, 
 +    private long lastEarlyOpenLength = 0;
 +    private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance);
 +
 +    private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder()
 +                                                        .trickleFsync(DatabaseDescriptor.getTrickleFsync())
 +                                                        .trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
 +                                                        .build();
 +
 +    public BigTableWriter(Descriptor descriptor,
 +                          long keyCount,
 +                          long repairedAt,
 +                          CFMetaData metadata,
                            MetadataCollector metadataCollector, 
                            SerializationHeader header,
 +                          Collection<SSTableFlushObserver> observers,
-                           LifecycleTransaction txn)
+                           LifecycleNewTracker lifecycleNewTracker)
      {
 -        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
 +        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
-         txn.trackNew(this); // must track before any files are created
+         lifecycleNewTracker.trackNew(this); // must track before any files are created
  
          if (compression)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 6465bf7,07278cb..dbd5a4a
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -34,10 -34,11 +34,11 @@@ import com.ning.compress.lzf.LZFInputSt
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.db.rows.*;
 -import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.Version;
  import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
@@@ -151,11 -152,11 +150,12 @@@ public class StreamReade
      {
          Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
 -            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
 -        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
 +            throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
  
-         RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
 -        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata),
 -                session.getReceivingTask(cfId).createLifecycleNewTracker());
++        LifecycleNewTracker lifecycleNewTracker = session.getReceivingTask(cfId).createLifecycleNewTracker();
++        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, lifecycleNewTracker, getHeader(cfs.metadata));
 +        StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
 +        return writer;
      }
  
      protected long totalSize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 736d30f,c79a711..a426207
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -223,18 -208,12 +223,18 @@@ public class StreamSession implements I
      }
  
  
-     public LifecycleTransaction getTransaction(UUID cfId)
+     StreamReceiveTask getReceivingTask(UUID cfId)
      {
          assert receivers.containsKey(cfId);
-         return receivers.get(cfId).getTransaction();
+         return receivers.get(cfId);
      }
  
 +    private boolean isKeepAliveSupported()
 +    {
 +        CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer);
 +        return STREAM_KEEP_ALIVE.isSupportedBy(peerVersion);
 +    }
 +
      /**
       * Bind this session to report to specific {@link StreamResultFuture} and
       * perform pre-streaming initialization.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 56ac4ba,757add9..7117df1
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -644,9 -642,9 +645,9 @@@ public class ScrubTes
  
      private static class TestMultiWriter extends SimpleSSTableMultiWriter
      {
-         TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
 -        TestMultiWriter(SSTableWriter writer)
++        TestMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker)
          {
-             super(writer, txn);
 -            super(writer);
++            super(writer, lifecycleNewTracker);
          }
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org