You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2015/11/19 23:19:17 UTC
cassandra git commit: Add sstable flush observer
Repository: cassandra
Updated Branches:
refs/heads/trunk b636b0742 -> f81a91d3f
Add sstable flush observer
patch by Pavel Yaskevich; reviewed by Sam Tunnicliffe for CASSANDRA-10678
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f81a91d3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f81a91d3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f81a91d3
Branch: refs/heads/trunk
Commit: f81a91d3fe0d1cd93f093c74356a1d7d018ed22f
Parents: b636b07
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Fri Nov 6 18:38:47 2015 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Thu Nov 19 14:16:59 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../org/apache/cassandra/db/ColumnIndex.java | 18 +-
.../compaction/AbstractCompactionStrategy.java | 11 +-
.../db/compaction/CompactionManager.java | 2 +
.../compaction/CompactionStrategyManager.java | 13 +-
.../cassandra/db/compaction/Upgrader.java | 1 +
.../writers/DefaultCompactionWriter.java | 1 +
.../writers/MajorLeveledCompactionWriter.java | 1 +
.../writers/MaxSSTableSizeWriter.java | 1 +
.../SplittingSizeTieredCompactionWriter.java | 1 +
src/java/org/apache/cassandra/index/Index.java | 15 ++
.../io/sstable/AbstractSSTableSimpleWriter.java | 4 +-
.../cassandra/io/sstable/SSTableTxnWriter.java | 11 +-
.../io/sstable/SimpleSSTableMultiWriter.java | 4 +-
.../io/sstable/format/SSTableFlushObserver.java | 55 +++++
.../io/sstable/format/SSTableWriter.java | 83 +++++--
.../io/sstable/format/big/BigFormat.java | 9 +-
.../io/sstable/format/big/BigTableWriter.java | 8 +-
.../apache/cassandra/db/RowIndexEntryTest.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 2 +-
.../db/lifecycle/RealTransactionsTest.java | 1 +
.../io/sstable/SSTableRewriterTest.java | 2 +-
.../format/SSTableFlushObserverTest.java | 217 +++++++++++++++++++
24 files changed, 425 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd68a3c..d4efbcc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Add sstable flush observer (CASSANDRA-10678)
* Improve NTS endpoints calculation (CASSANDRA-10200)
* Improve performance of the folderSize function (CASSANDRA-10677)
* Add support for type casting in selection clause (CASSANDRA-10310)
@@ -6,7 +7,6 @@
* Abort in-progress queries that time out (CASSANDRA-7392)
* Add transparent data encryption core classes (CASSANDRA-9945)
-
3.1
Merged from 3.0:
* Correctly preserve deletion info on updated rows when notifying indexers
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 23cede4..08ce2dd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -469,7 +469,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
{
- return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
}
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index ede3f79..749c155 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,11 +45,15 @@ public class ColumnIndex
this.columnsIndex = columnsIndex;
}
- public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException
+ public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator,
+ SequentialWriter output,
+ SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
+ Version version) throws IOException
{
assert !iterator.isEmpty() && version.storeRows();
- Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion());
+ Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion());
return builder.build();
}
@@ -83,15 +88,19 @@ public class ColumnIndex
private DeletionTime openMarker;
+ private final Collection<SSTableFlushObserver> observers;
+
public Builder(UnfilteredRowIterator iterator,
SequentialWriter writer,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
int version)
{
this.iterator = iterator;
this.writer = writer;
this.header = header;
this.version = version;
+ this.observers = observers == null ? Collections.emptyList() : observers;
this.initialPosition = writer.position();
}
@@ -142,6 +151,11 @@ public class ColumnIndex
}
UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
+
+ // notify observers about each new cell added to the row
+ if (!observers.isEmpty() && unfiltered.isRow())
+ ((Row) unfiltered).stream().forEach(cell -> observers.forEach((o) -> o.nextCell(cell)));
+
lastClustering = unfiltered.clustering();
previousRowStart = pos;
++written;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index ae8839e..cab56bb 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@ -509,8 +510,14 @@ public abstract class AbstractCompactionStrategy
return groupedSSTables;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ MetadataCollector meta,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
- return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 559a2ea..02d6aa1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -963,6 +963,7 @@ public class CompactionManager implements CompactionManagerMBean
repairedAt,
sstable.getSSTableLevel(),
sstable.header,
+ cfs.indexManager.listIndexes(),
txn);
}
@@ -995,6 +996,7 @@ public class CompactionManager implements CompactionManagerMBean
cfs.metadata,
new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
SerializationHeader.make(cfs.metadata, sstables),
+ cfs.indexManager.listIndexes(),
txn);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index bd72c64..7c7e86a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -22,6 +22,7 @@ import java.util.*;
import java.util.concurrent.Callable;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.index.Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -499,15 +500,21 @@ public class CompactionStrategyManager implements INotificationConsumer
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ MetadataCollector collector,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
- return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
else
{
- return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index fcd1a3c..3f0f9a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -75,6 +75,7 @@ public class Upgrader
cfs.metadata,
sstableMetadataCollector,
SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
+ cfs.indexManager.listIndexes(),
transaction);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8b90224..8b4351f 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -67,6 +67,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
cfs.metadata,
new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ cfs.indexManager.listIndexes(),
txn);
sstableWriter.switchWriter(writer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 6d191f8..b0c4562 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -106,6 +106,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ cfs.indexManager.listIndexes(),
txn);
sstableWriter.switchWriter(writer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 142fe87..1dc72e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -87,6 +87,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ cfs.indexManager.listIndexes(),
txn);
sstableWriter.switchWriter(writer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 796391c..3a7f526 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -111,6 +111,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ cfs.indexManager.listIndexes(),
txn);
logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
sstableWriter.switchWriter(writer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 8655044..7bca924 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -7,6 +7,7 @@ import java.util.function.BiFunction;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -15,6 +16,8 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -185,6 +188,18 @@ public interface Index
*/
public boolean shouldBuildBlocking();
+ /**
+ * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction).
+ *
+ * @param descriptor The descriptor of the sstable observer is requested for.
+ * @param opType The type of the operation which requests observer e.g. memtable flush or compaction.
+ *
+ * @return SSTable flush observer.
+ */
+ default SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationType opType)
+ {
+ return null;
+ }
/*
* Index selection
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 62348ec..0213fd5 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,6 +22,7 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Closeable;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,7 +66,8 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
0,
- new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
+ new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS),
+ Collections.emptySet());
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index e889d85..5286ac5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -102,12 +103,18 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
}
@SuppressWarnings("resource") // log and writer closed during postCleanup
- public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(CFMetaData cfm,
+ Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ int sstableLevel,
+ SerializationHeader header,
+ Collection<Index> indexes)
{
// if the column family store does not exist, we create a new default SSTableMultiWriter to use:
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
- SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
+ SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, indexes, txn);
return new SSTableTxnWriter(txn, writer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index fd1b9a7..68dbd74 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -109,9 +110,10 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
CFMetaData cfm,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<Index> indexes,
LifecycleTransaction txn)
{
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
return new SimpleSSTableMultiWriter(writer);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
new file mode 100644
index 0000000..d6f54e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.ColumnData;
+
+/**
+ * Observer for events in the lifecycle of writing out an sstable.
+ */
+public interface SSTableFlushObserver
+{
+ /**
+ * Called before writing any data to the sstable.
+ */
+ void begin();
+
+ /**
+ * Called when a new partition in being written to the sstable,
+ * but before any cells are processed (see {@link #nextCell(ColumnData)}).
+ *
+ * @param key The key being appended to SSTable.
+ * @param indexPosition The position of the key in the SSTable PRIMARY_INDEX file.
+ */
+ void startPartition(DecoratedKey key, long indexPosition);
+
+ /**
+ * Called after the cell is written to the sstable.
+ * Will be preceded by a call to {@code startPartition(DecoratedKey, long)},
+ * and the cell should be assumed to belong to that row.
+ *
+ * @param cell The cell being added to the row.
+ */
+ void nextCell(ColumnData cell);
+
+ /**
+ * Called when all data is written to the file and it's ready to be finished up.
+ */
+ void complete();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 4cbbd70..3203964 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -18,20 +18,21 @@
package org.apache.cassandra.io.sstable.format;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
@@ -58,6 +59,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
protected final SerializationHeader header;
protected final TransactionalProxy txnProxy = txnProxy();
+ protected final Collection<SSTableFlushObserver> observers;
protected abstract TransactionalProxy txnProxy();
@@ -69,12 +71,13 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected boolean openResult;
}
- protected SSTableWriter(Descriptor descriptor,
- long keyCount,
- long repairedAt,
- CFMetaData metadata,
- MetadataCollector metadataCollector,
- SerializationHeader header)
+ protected SSTableWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ CFMetaData metadata,
+ MetadataCollector metadataCollector,
+ SerializationHeader header,
+ Collection<SSTableFlushObserver> observers)
{
super(descriptor, components(metadata), metadata);
this.keyCount = keyCount;
@@ -82,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
this.metadataCollector = metadataCollector;
this.header = header;
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
+ this.observers = observers == null ? Collections.emptySet() : observers;
}
public static SSTableWriter create(Descriptor descriptor,
@@ -90,16 +94,23 @@ public abstract class SSTableWriter extends SSTable implements Transactional
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<Index> indexes,
LifecycleTransaction txn)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
}
- public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ public static SSTableWriter create(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ int sstableLevel,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
}
public static SSTableWriter create(CFMetaData metadata,
@@ -108,21 +119,34 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long repairedAt,
int sstableLevel,
SerializationHeader header,
+ Collection<Index> indexes,
LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+ return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
}
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
+ public static SSTableWriter create(String filename,
+ long keyCount,
+ long repairedAt,
+ int sstableLevel,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, txn);
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn);
}
@VisibleForTesting
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleTransaction txn)
+ public static SSTableWriter create(String filename,
+ long keyCount,
+ long repairedAt,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, txn);
+ Descriptor descriptor = Descriptor.fromFilename(filename);
+ return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
}
private static Set<Component> components(CFMetaData metadata)
@@ -150,6 +174,27 @@ public abstract class SSTableWriter extends SSTable implements Transactional
return components;
}
+ private static Collection<SSTableFlushObserver> observers(Descriptor descriptor,
+ Collection<Index> indexes,
+ OperationType operationType)
+ {
+ if (indexes == null)
+ return Collections.emptyList();
+
+ List<SSTableFlushObserver> observers = new ArrayList<>(indexes.size());
+ for (Index index : indexes)
+ {
+ SSTableFlushObserver observer = index.getFlushObserver(descriptor, operationType);
+ if (observer != null)
+ {
+ observer.begin();
+ observers.add(observer);
+ }
+ }
+
+ return ImmutableList.copyOf(observers);
+ }
+
public abstract void mark();
/**
@@ -211,6 +256,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
{
setOpenResult(openResult);
txnProxy.finish();
+ observers.forEach(SSTableFlushObserver::complete);
return finished();
}
@@ -285,6 +331,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 1f2a98f..e030b5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable.format.big;
+import java.util.Collection;
import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
@@ -25,10 +26,7 @@ import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.net.MessagingService;
@@ -88,9 +86,10 @@ public class BigFormat implements SSTableFormat
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b6077e0..5fe9147 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,12 +18,14 @@
package org.apache.cassandra.io.sstable.format.big;
import java.io.*;
+import java.util.Collection;
import java.util.Map;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -63,9 +65,10 @@ public class BigTableWriter extends SSTableWriter
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
+ super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
txn.trackNew(this); // must track before any files are created
if (compression)
@@ -143,10 +146,11 @@ public class BigTableWriter extends SSTableWriter
return null;
long startPosition = beforeAppend(key);
+ observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector)))
{
- ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, descriptor.version);
+ ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, observers, descriptor.version);
RowIndexEntry entry = RowIndexEntry.create(startPosition, collecting.partitionLevelDeletion(), index);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 62c88a0..0c7ee59 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -135,7 +135,7 @@ public class RowIndexEntryTest extends CQLTester
File tempFile = File.createTempFile("row_index_entry_test", null);
tempFile.deleteOnExit();
SequentialWriter writer = SequentialWriter.open(tempFile);
- ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, BigFormat.latestVersion);
+ ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion);
RowIndexEntry<IndexHelper.IndexInfo> withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfs.metadata, BigFormat.latestVersion, header);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index d5baec8..27b774d 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -652,7 +652,7 @@ public class ScrubTest
TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata,
MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+ super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 4fbbb36..bab9c90 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -168,6 +168,7 @@ public class RealTransactionsTest extends SchemaLoader
0,
0,
SerializationHeader.make(cfs.metadata, txn.originals()),
+ cfs.indexManager.listIndexes(),
txn));
while (ci.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..de9b357 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
{
String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
}
public static ByteBuffer random(int i, int size)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
new file mode 100644
index 0000000..29ad387
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class SSTableFlushObserverTest
+{
+ private static final String KS_NAME = "test";
+ private static final String CF_NAME = "flush_observer";
+
+ @Test
+ public void testFlushObserver()
+ {
+ CFMetaData cfm = CFMetaData.Builder.create(KS_NAME, CF_NAME)
+ .addPartitionKey("id", UTF8Type.instance)
+ .addRegularColumn("first_name", UTF8Type.instance)
+ .addRegularColumn("age", Int32Type.instance)
+ .addRegularColumn("height", LongType.instance)
+ .build();
+
+ LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.COMPACTION);
+ FlushObserver observer = new FlushObserver();
+
+ String sstableDirectory = DatabaseDescriptor.getAllDataFileLocations()[0];
+ File directory = new File(sstableDirectory + File.pathSeparator + KS_NAME + File.pathSeparator + CF_NAME);
+ directory.deleteOnExit();
+
+ if (!directory.exists() && !directory.mkdirs())
+ throw new FSWriteError(new IOException("failed to create tmp directory"), directory.getAbsolutePath());
+
+ SSTableFormat.Type sstableFormat = DatabaseDescriptor.getSSTableFormat();
+
+ BigTableWriter writer = new BigTableWriter(new Descriptor(sstableFormat.info.getLatestVersion().version,
+ directory,
+ KS_NAME, CF_NAME,
+ 0,
+ sstableFormat),
+ 10L, 0L, cfm,
+ new MetadataCollector(cfm.comparator).sstableLevel(0),
+ new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS),
+ Collections.singletonList(observer),
+ transaction);
+
+ SSTableReader reader = null;
+ Multimap<ByteBuffer, Cell> expected = ArrayListMultimap.create();
+
+ try
+ {
+ final long now = System.currentTimeMillis();
+
+ ByteBuffer key = UTF8Type.instance.fromString("key1");
+ expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jack")),
+ BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(27)),
+ BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(183L))));
+
+ writer.append(new RowIterator(cfm, key.duplicate(), Collections.singletonList(buildRow(expected.get(key)))));
+
+ key = UTF8Type.instance.fromString("key2");
+ expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jim")),
+ BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)),
+ BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(180L))));
+
+ writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
+
+ key = UTF8Type.instance.fromString("key3");
+ expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("ken")),
+ BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)),
+ BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(178L))));
+
+ writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
+
+ reader = writer.finish(true);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(writer);
+ }
+
+ Assert.assertTrue(observer.isComplete);
+ Assert.assertEquals(expected.size(), observer.rows.size());
+
+ for (Pair<ByteBuffer, Long> e : observer.rows.keySet())
+ {
+ ByteBuffer key = e.left;
+ Long indexPosition = e.right;
+
+ try (FileDataInput index = reader.ifile.createReader(indexPosition))
+ {
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(index);
+ Assert.assertEquals(0, UTF8Type.instance.compare(key, indexKey));
+ }
+ catch (IOException ex)
+ {
+ throw new FSReadError(ex, reader.getIndexFilename());
+ }
+
+ Assert.assertEquals(expected.get(key), observer.rows.get(e));
+ }
+ }
+
+ private static class RowIterator extends AbstractUnfilteredRowIterator
+ {
+ private final Iterator<Unfiltered> rows;
+
+ public RowIterator(CFMetaData cfm, ByteBuffer key, Collection<Unfiltered> content)
+ {
+ super(cfm,
+ DatabaseDescriptor.getPartitioner().decorateKey(key),
+ DeletionTime.LIVE,
+ cfm.partitionColumns(),
+ BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING),
+ false,
+ EncodingStats.NO_STATS);
+
+ rows = content.iterator();
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ return rows.hasNext() ? rows.next() : endOfData();
+ }
+ }
+
+ private static class FlushObserver implements SSTableFlushObserver
+ {
+ private final Multimap<Pair<ByteBuffer, Long>, Cell> rows = ArrayListMultimap.create();
+ private Pair<ByteBuffer, Long> currentKey;
+ private boolean isComplete;
+
+ @Override
+ public void begin()
+ {}
+
+ @Override
+ public void startPartition(DecoratedKey key, long indexPosition)
+ {
+ currentKey = Pair.create(key.getKey(), indexPosition);
+ }
+
+ @Override
+ public void nextCell(ColumnData cell)
+ {
+ rows.put(currentKey, (Cell) cell);
+ }
+
+ @Override
+ public void complete()
+ {
+ isComplete = true;
+ }
+ }
+
+ private static Row buildRow(Collection<Cell> cells)
+ {
+ Row.Builder rowBuilder = BTreeRow.sortedBuilder();
+ rowBuilder.newRow(Clustering.EMPTY);
+ cells.forEach(rowBuilder::addCell);
+ return rowBuilder.build();
+ }
+
+ private static ColumnDefinition getColumn(CFMetaData cfm, String name)
+ {
+ return cfm.getColumnDefinition(UTF8Type.instance.fromString(name));
+ }
+}