You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/10 15:07:42 UTC
[07/11] cassandra git commit: Streaming needs to synchronise access
to LifecycleTransaction
Streaming needs to synchronise access to LifecycleTransaction
patch by Stefania Alborghetti and Benedict; reviewed by Robert Stupp for CASSANDRA-14554
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84ffcb82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84ffcb82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84ffcb82
Branch: refs/heads/trunk
Commit: 84ffcb82a74667b957201f2cdae2d6b308956549
Parents: bbf7dac
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Nov 7 14:07:12 2018 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Dec 10 15:00:00 2018 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 8 ++--
.../compaction/AbstractCompactionStrategy.java | 6 +--
.../compaction/CompactionStrategyManager.java | 8 ++--
.../db/lifecycle/LifecycleNewTracker.java | 47 ++++++++++++++++++++
.../db/lifecycle/LifecycleTransaction.java | 7 ++-
.../apache/cassandra/db/lifecycle/LogFile.java | 24 ++++------
.../cassandra/db/lifecycle/LogTransaction.java | 2 +-
.../io/sstable/SimpleSSTableMultiWriter.java | 6 +--
.../io/sstable/format/SSTableWriter.java | 24 +++++-----
.../io/sstable/format/big/BigFormat.java | 6 +--
.../io/sstable/format/big/BigTableWriter.java | 6 +--
.../cassandra/streaming/StreamReader.java | 6 +--
.../cassandra/streaming/StreamReceiveTask.java | 37 +++++++++++++--
.../cassandra/streaming/StreamSession.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 9 ++--
16 files changed, 140 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f9b59df..01d4789 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.18
+ * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554)
* Fix cassandra-stress write hang with default options (CASSANDRA-14616)
* Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919)
* CommitLogReplayer.handleReplayError should print stack traces (CASSANDRA-14589)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4c7bc46..c455c4c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -475,15 +475,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return directories;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, lifecycleNewTracker);
}
public boolean supportsEarlyOpen()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a80a6f4..9f07691 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -27,7 +27,7 @@ import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@ -514,9 +514,9 @@ public abstract class AbstractCompactionStrategy
return groupedSSTables;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, lifecycleNewTracker);
}
public boolean supportsEarlyOpen()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a9bfbd2..1d3d18c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -490,15 +490,15 @@ public class CompactionStrategyManager implements INotificationConsumer
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
- return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
else
{
- return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
new file mode 100644
index 0000000..9a0785c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.db.lifecycle;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
+
+/**
+ * An interface for tracking new sstables added to a LifecycleTransaction, possibly through some proxy.
+ */
+public interface LifecycleNewTracker
+{
+ /**
+ * Called when a new table is about to be created, so that this table can be tracked by a transaction.
+ * @param table - the new table to be tracked
+ */
+ void trackNew(SSTable table);
+
+
+ /**
+ * Called when a new table is no longer required, so that this table can be untracked by a transaction.
+ * @param table - the table to be untracked
+ */
+ void untrackNew(SSTable table);
+
+ /**
+ * @return the type of operation tracking these sstables
+ */
+ OperationType opType();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 582c9d8..af9a80a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -55,7 +55,7 @@ import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
* action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take
* actions that can be rolled back.
*/
-public class LifecycleTransaction extends Transactional.AbstractTransactional
+public class LifecycleTransaction extends Transactional.AbstractTransactional implements LifecycleNewTracker
{
private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
@@ -176,6 +176,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return log;
}
+ @Override //LifecycleNewTracker
public OperationType opType()
{
return log.type();
@@ -523,11 +524,15 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return getFirst(originals, null);
}
+ // LifecycleNewTracker
+
+ @Override
public void trackNew(SSTable table)
{
log.trackNew(table);
}
+ @Override
public void untrackNew(SSTable table)
{
log.untrackNew(table);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 8425a6d..e9047ad 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -246,13 +246,11 @@ final class LogFile implements AutoCloseable
void commit()
{
- assert !completed() : "Already completed!";
addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
}
void abort()
{
- assert !completed() : "Already completed!";
addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
}
@@ -281,20 +279,13 @@ final class LogFile implements AutoCloseable
void add(Type type, SSTable table)
{
- add(makeRecord(type, table));
- }
-
- void add(LogRecord record)
- {
- if (!addRecord(record))
- throw new IllegalStateException();
+ addRecord(makeRecord(type, table));
}
public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
{
for (LogRecord record : makeRecords(type, toBulkAdd).values())
- if (!addRecord(record))
- throw new IllegalStateException();
+ addRecord(record);
}
Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables)
@@ -332,14 +323,17 @@ final class LogFile implements AutoCloseable
return record.asType(type);
}
- private boolean addRecord(LogRecord record)
+ void addRecord(LogRecord record)
{
+ if (completed())
+ throw new IllegalStateException("Transaction already completed");
+
if (records.contains(record))
- return false;
+ throw new IllegalStateException("Record already exists");
replicas.append(record);
-
- return records.add(record);
+ if (!records.add(record))
+ throw new IllegalStateException("Failed to add record");
}
void remove(Type type, SSTable table)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index a10bcd2..00a222a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -164,7 +164,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
return new SSTableTidier(reader, true, this);
}
- txnFile.add(logRecord);
+ txnFile.addRecord(logRecord);
if (tracker != null)
tracker.notifyDeleting(reader);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index fd1b9a7..ded070e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -25,7 +25,7 @@ import java.util.UUID;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -109,9 +109,9 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
CFMetaData cfm,
MetadataCollector metadataCollector,
SerializationHeader header,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, lifecycleNewTracker);
return new SimpleSSTableMultiWriter(writer);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 5f35029..fcc23a2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -90,16 +90,16 @@ public abstract class SSTableWriter extends SSTable implements Transactional
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
}
- public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
}
public static SSTableWriter create(CFMetaData metadata,
@@ -108,21 +108,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long repairedAt,
int sstableLevel,
SerializationHeader header,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+ return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker);
}
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, txn);
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
}
@VisibleForTesting
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleTransaction txn)
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, txn);
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker);
}
private static Set<Component> components(CFMetaData metadata)
@@ -285,6 +285,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
- LifecycleTransaction txn);
+ LifecycleNewTracker lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index ae93c5f..360ef8a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -22,7 +22,7 @@ import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -88,9 +88,9 @@ public class BigFormat implements SSTableFormat
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 0d500c1..f733619 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,12 +21,12 @@ import java.io.*;
import java.util.Map;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
@@ -63,10 +63,10 @@ public class BigTableWriter extends SSTableWriter
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
- txn.trackNew(this); // must track before any files are created
+ lifecycleNewTracker.trackNew(this); // must track before any files are created
if (compression)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 4ca7937..07278cb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -34,6 +34,7 @@ import com.ning.compress.lzf.LZFInputStream;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -49,8 +50,6 @@ import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
/**
* StreamReader reads from stream and writes to SSTable.
*/
@@ -156,7 +155,8 @@ public class StreamReader
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
- return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
+ return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata),
+ session.getReceivingTask(cfId).createLifecycleNewTracker());
}
protected long totalSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9e65d34..ea82d9b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -132,11 +135,39 @@ public class StreamReceiveTask extends StreamTask
return totalSize;
}
- public synchronized LifecycleTransaction getTransaction()
+ /**
+ * @return a LifecycleNewTracker whose operations are synchronised on this StreamReceiveTask.
+ */
+ public synchronized LifecycleNewTracker createLifecycleNewTracker()
{
if (done)
- throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
- return txn;
+ throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId));
+
+ return new LifecycleNewTracker()
+ {
+ @Override
+ public void trackNew(SSTable table)
+ {
+ synchronized (StreamReceiveTask.this)
+ {
+ txn.trackNew(table);
+ }
+ }
+
+ @Override
+ public void untrackNew(SSTable table)
+ {
+ synchronized (StreamReceiveTask.this)
+ {
+ txn.untrackNew(table);
+ }
+ }
+
+ public OperationType opType()
+ {
+ return txn.opType();
+ }
+ };
}
private static class OnCompletionRunnable implements Runnable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index d57fae8..c79a711 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -208,10 +208,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
- public LifecycleTransaction getTransaction(UUID cfId)
+ StreamReceiveTask getReceivingTask(UUID cfId)
{
assert receivers.containsKey(cfId);
- return receivers.get(cfId).getTransaction();
+ return receivers.get(cfId);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index fc2faea..757add9 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -632,11 +633,11 @@ public class ScrubTest
assertOrdered(Util.cmd(cfs).filterOn(colName, Operator.EQ, 1L).build(), numRows / 2);
}
- private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, CFMetaData metadata, LifecycleTransaction txn)
+ private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, CFMetaData metadata, LifecycleNewTracker lifecycleNewTracker)
{
SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0);
- return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn));
+ return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, lifecycleNewTracker));
}
private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -653,9 +654,9 @@ public class ScrubTest
private static class TestWriter extends BigTableWriter
{
TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata,
- MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
- super(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+ super(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org