You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/10 15:07:39 UTC
[04/11] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0918ba01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0918ba01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0918ba01
Branch: refs/heads/trunk
Commit: 0918ba01176392deadf8655a61dad44979a49ee5
Parents: 6a449b8 3539a07
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 7 12:27:08 2018 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 7 12:27:08 2018 +0000
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 8 ++--
.../compaction/AbstractCompactionStrategy.java | 5 ++-
.../compaction/CompactionStrategyManager.java | 14 +++---
.../db/lifecycle/LifecycleNewTracker.java | 47 ++++++++++++++++++++
.../db/lifecycle/LifecycleTransaction.java | 7 ++-
.../apache/cassandra/db/lifecycle/LogFile.java | 24 ++++------
.../cassandra/db/lifecycle/LogTransaction.java | 2 +-
.../io/sstable/SimpleSSTableMultiWriter.java | 16 +++----
.../sstable/format/RangeAwareSSTableWriter.java | 12 ++---
.../io/sstable/format/SSTableWriter.java | 24 +++++-----
.../io/sstable/format/big/BigFormat.java | 6 +--
.../io/sstable/format/big/BigTableWriter.java | 6 +--
.../cassandra/streaming/StreamReader.java | 6 +--
.../cassandra/streaming/StreamReceiveTask.java | 35 ++++++++++++++-
.../cassandra/streaming/StreamSession.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 5 ++-
16 files changed, 150 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 13f0280,c455c4c..700c1cc
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -507,15 -475,15 +507,15 @@@ public class ColumnFamilyStore implemen
return directories;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
- return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, lifecycleNewTracker);
++ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker);
}
public boolean supportsEarlyOpen()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 8454147,9f07691..3d7800d
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -26,7 -26,8 +26,8 @@@ import com.google.common.collect.Iterab
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.index.Index;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@@ -571,20 -514,9 +572,20 @@@ public abstract class AbstractCompactio
return groupedSSTables;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public CompactionLogger.Strategy strategyLogger()
+ {
+ return CompactionLogger.Strategy.none;
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ MetadataCollector meta,
+ SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
++ LifecycleNewTracker lifecycleNewTracker)
{
- return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
- return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, lifecycleNewTracker);
++ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, lifecycleNewTracker);
}
public boolean supportsEarlyOpen()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a50f428,1d3d18c..86170a1
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -20,30 -20,20 +20,30 @@@ package org.apache.cassandra.db.compact
import java.util.*;
import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
-
- import org.apache.cassandra.db.DiskBoundaries;
- import org.apache.cassandra.db.Memtable;
- import org.apache.cassandra.index.Index;
+import com.google.common.primitives.Ints;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
++import org.apache.cassandra.db.DiskBoundaries;
+ import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
++import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@@ -1010,43 -490,15 +1010,43 @@@ public class CompactionStrategyManager
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ MetadataCollector collector,
+ SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
++ LifecycleNewTracker lifecycleNewTracker)
{
- if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
{
- return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
+ if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ {
- return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
++ return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
+ }
+ else
+ {
- return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
++ return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
+ }
}
- else
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
+ public boolean isRepaired(AbstractCompactionStrategy strategy)
+ {
+ readLock.lock();
+ try
+ {
+ return repaired.contains(strategy);
+ }
+ finally
{
- return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
+ readLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2217ae2,ded070e..76e4dbb
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@@ -25,9 -25,8 +25,9 @@@ import java.util.UUID
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@@ -35,11 -34,9 +35,11 @@@
public class SimpleSSTableMultiWriter implements SSTableMultiWriter
{
private final SSTableWriter writer;
- private final LifecycleTransaction txn;
++ private final LifecycleNewTracker lifecycleNewTracker;
- protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
- protected SimpleSSTableMultiWriter(SSTableWriter writer)
++ protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker)
{
- this.txn = txn;
++ this.lifecycleNewTracker = lifecycleNewTracker;
this.writer = writer;
}
@@@ -92,7 -89,6 +92,7 @@@
public Throwable abort(Throwable accumulate)
{
- txn.untrackNew(writer);
++ lifecycleNewTracker.untrackNew(writer);
return writer.abort(accumulate);
}
@@@ -113,10 -109,9 +113,10 @@@
CFMetaData cfm,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
- return new SimpleSSTableMultiWriter(writer, txn);
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, lifecycleNewTracker);
- return new SimpleSSTableMultiWriter(writer);
++ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, lifecycleNewTracker);
++ return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 353aacb,0000000..3358225
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -1,208 -1,0 +1,208 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class RangeAwareSSTableWriter implements SSTableMultiWriter
+{
+ private final List<PartitionPosition> boundaries;
+ private final List<Directories.DataDirectory> directories;
+ private final int sstableLevel;
+ private final long estimatedKeys;
+ private final long repairedAt;
+ private final SSTableFormat.Type format;
+ private final SerializationHeader header;
- private final LifecycleTransaction txn;
++ private final LifecycleNewTracker lifecycleNewTracker;
+ private int currentIndex = -1;
+ public final ColumnFamilyStore cfs;
+ private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
+ private final List<SSTableReader> finishedReaders = new ArrayList<>();
+ private SSTableMultiWriter currentWriter = null;
+
- public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
++ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException
+ {
+ DiskBoundaries db = cfs.getDiskBoundaries();
+ directories = db.directories;
+ this.sstableLevel = sstableLevel;
+ this.cfs = cfs;
+ this.estimatedKeys = estimatedKeys / directories.size();
+ this.repairedAt = repairedAt;
+ this.format = format;
- this.txn = txn;
++ this.lifecycleNewTracker = lifecycleNewTracker;
+ this.header = header;
+ boundaries = db.positions;
+ if (boundaries == null)
+ {
+ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+ if (localDir == null)
+ throw new IOException(String.format("Insufficient disk space to store %s",
+ FBUtilities.prettyPrintMemory(totalSize)));
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker);
+ }
+ }
+
+ private void maybeSwitchWriter(DecoratedKey key)
+ {
+ if (boundaries == null)
+ return;
+
+ boolean switched = false;
+ while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
+ {
+ switched = true;
+ currentIndex++;
+ }
+
+ if (switched)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker);
+ }
+ }
+
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ maybeSwitchWriter(partition.partitionKey());
+ return currentWriter.append(partition);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ return finishedReaders;
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ finishedWriters.forEach((w) -> w.setOpenResult(openResult));
+ currentWriter.setOpenResult(openResult);
+ return this;
+ }
+
+ public String getFilename()
+ {
+ return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return currentWriter.getFilePointer();
+ }
+
+ @Override
+ public UUID getCfId()
+ {
+ return currentWriter.getCfId();
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ accumulate = writer.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public Throwable abort(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter finishedWriter : finishedWriters)
+ accumulate = finishedWriter.abort(accumulate);
+
+ return accumulate;
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
+ }
+
+ @Override
+ public void close()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::close);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 9fb5f7c,fcc23a2..e320f30
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -29,11 -30,8 +29,11 @@@ import org.apache.cassandra.config.Data
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.OperationType;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
@@@ -95,23 -90,16 +95,23 @@@ public abstract class SSTableWriter ext
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
++ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker);
}
- public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public static SSTableWriter create(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ int sstableLevel,
+ SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
++ LifecycleNewTracker lifecycleNewTracker)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
++ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
}
public static SSTableWriter create(CFMetaData metadata,
@@@ -120,34 -108,21 +120,34 @@@
long repairedAt,
int sstableLevel,
SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker);
++ return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, lifecycleNewTracker);
}
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public static SSTableWriter create(String filename,
+ long keyCount,
+ long repairedAt,
+ int sstableLevel,
+ SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
++ LifecycleNewTracker lifecycleNewTracker)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn);
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
++ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
}
@VisibleForTesting
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public static SSTableWriter create(String filename,
+ long keyCount,
+ long repairedAt,
+ SerializationHeader header,
+ Collection<Index> indexes,
- LifecycleTransaction txn)
++ LifecycleNewTracker lifecycleNewTracker)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker);
+ Descriptor descriptor = Descriptor.fromFilename(filename);
- return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
++ return create(descriptor, keyCount, repairedAt, 0, header, indexes, lifecycleNewTracker);
}
private static Set<Component> components(CFMetaData metadata)
@@@ -344,7 -285,6 +344,7 @@@
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
- LifecycleTransaction txn);
+ LifecycleNewTracker lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index b62cb11,360ef8a..9af7dc0
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -23,10 -22,13 +23,10 @@@ import java.util.Set
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.net.MessagingService;
@@@ -86,10 -88,9 +86,10 @@@ public class BigFormat implements SSTab
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
++ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c3139a3,f733619..9083cd3
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -17,31 -17,23 +17,31 @@@
*/
package org.apache.cassandra.io.sstable.format.big;
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import java.util.Optional;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@@ -60,25 -56,17 +60,25 @@@ public class BigTableWriter extends SST
protected final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
private DataPosition dataMark;
-
- public BigTableWriter(Descriptor descriptor,
- Long keyCount,
- Long repairedAt,
- CFMetaData metadata,
+ private long lastEarlyOpenLength = 0;
+ private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance);
+
+ private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder()
+ .trickleFsync(DatabaseDescriptor.getTrickleFsync())
+ .trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
+ .build();
+
+ public BigTableWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
+ super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
- txn.trackNew(this); // must track before any files are created
+ lifecycleNewTracker.trackNew(this); // must track before any files are created
if (compression)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 6465bf7,07278cb..dbd5a4a
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -34,10 -34,11 +34,11 @@@ import com.ning.compress.lzf.LZFInputSt
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
@@@ -151,11 -152,11 +150,12 @@@ public class StreamReade
{
Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
- throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+ throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
- return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata),
- session.getReceivingTask(cfId).createLifecycleNewTracker());
++ LifecycleNewTracker lifecycleNewTracker = session.getReceivingTask(cfId).createLifecycleNewTracker();
++ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, lifecycleNewTracker, getHeader(cfs.metadata));
+ StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
+ return writer;
}
protected long totalSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 736d30f,c79a711..a426207
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -223,18 -208,12 +223,18 @@@ public class StreamSession implements I
}
- public LifecycleTransaction getTransaction(UUID cfId)
+ StreamReceiveTask getReceivingTask(UUID cfId)
{
assert receivers.containsKey(cfId);
- return receivers.get(cfId).getTransaction();
+ return receivers.get(cfId);
}
+ private boolean isKeepAliveSupported()
+ {
+ CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer);
+ return STREAM_KEEP_ALIVE.isSupportedBy(peerVersion);
+ }
+
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 56ac4ba,757add9..7117df1
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -644,9 -642,9 +645,9 @@@ public class ScrubTes
private static class TestMultiWriter extends SimpleSSTableMultiWriter
{
- TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
- TestMultiWriter(SSTableWriter writer)
++ TestMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker)
{
- super(writer, txn);
- super(writer);
++ super(writer, lifecycleNewTracker);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org