You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:39 UTC
[15/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
deleted file mode 100644
index 6db9c3d..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call.
- * This function assumes that the CF is sorted by name and exploits the name index.
- */
-class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
- private final ColumnFamily emptyColumnFamily;
-
- private final SSTableReader sstable;
- private final List<IndexHelper.IndexInfo> indexes;
- private final FileDataInput originalInput;
- private FileDataInput file;
- private final boolean reversed;
- private final ColumnSlice[] slices;
- private final BlockFetcher fetcher;
- private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
- private final CellNameType comparator;
-
- // Holds range tombstone in reverse queries. See addColumn()
- private final Deque<OnDiskAtom> rangeTombstonesReversed;
-
- /**
- * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in
- * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of
- * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also
- * assumes that validation has been performed in terms of intervals (no overlapping intervals).
- */
- IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed)
- {
- Tracing.trace("Seeking to partition indexed section in data file");
- this.sstable = sstable;
- this.originalInput = input;
- this.reversed = reversed;
- this.slices = slices;
- this.comparator = sstable.metadata.comparator;
- this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null;
-
- try
- {
- this.indexes = indexEntry.columnsIndex();
- emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
- if (indexes.isEmpty())
- {
- setToRowStart(indexEntry, input);
- emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
- fetcher = new SimpleBlockFetcher();
- }
- else
- {
- emptyColumnFamily.delete(indexEntry.deletionTime());
- fetcher = new IndexedBlockFetcher(indexEntry.position);
- }
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new CorruptSSTableException(e, file.getPath());
- }
- }
-
- /**
- * Sets the seek position to the start of the row for column scanning.
- */
- private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
- {
- if (in == null)
- {
- this.file = sstable.getFileDataInput(rowEntry.position);
- }
- else
- {
- this.file = in;
- in.seek(rowEntry.position);
- }
- sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
- }
-
- public ColumnFamily getColumnFamily()
- {
- return emptyColumnFamily;
- }
-
- public DecoratedKey getKey()
- {
- throw new UnsupportedOperationException();
- }
-
- protected OnDiskAtom computeNext()
- {
- while (true)
- {
- if (reversed)
- {
- // Return all tombstone for the block first (see addColumn() below)
- OnDiskAtom column = rangeTombstonesReversed.poll();
- if (column != null)
- return column;
- }
-
- OnDiskAtom column = blockColumns.poll();
- if (column == null)
- {
- if (!fetcher.fetchMoreData())
- return endOfData();
- }
- else
- {
- return column;
- }
- }
- }
-
- public void close() throws IOException
- {
- if (originalInput == null && file != null)
- file.close();
- }
-
- protected void addColumn(OnDiskAtom col)
- {
- if (reversed)
- {
- /*
- * We put range tomstone markers at the beginning of the range they delete. But for reversed queries,
- * the caller still need to know about a RangeTombstone before it sees any column that it covers.
- * To make that simple, we keep said tombstones separate and return them all before any column for
- * a given block.
- */
- if (col instanceof RangeTombstone)
- rangeTombstonesReversed.addFirst(col);
- else
- blockColumns.addFirst(col);
- }
- else
- {
- blockColumns.addLast(col);
- }
- }
-
- private abstract class BlockFetcher
- {
- protected int currentSliceIdx;
-
- protected BlockFetcher(int sliceIdx)
- {
- this.currentSliceIdx = sliceIdx;
- }
-
- /*
- * Return the smallest key selected by the current ColumnSlice.
- */
- protected Composite currentStart()
- {
- return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
- }
-
- /*
- * Return the biggest key selected by the current ColumnSlice.
- */
- protected Composite currentFinish()
- {
- return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
- }
-
- protected abstract boolean setNextSlice();
-
- protected abstract boolean fetchMoreData();
-
- protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
- {
- return isBeforeSliceStart(column.name());
- }
-
- protected boolean isBeforeSliceStart(Composite name)
- {
- Composite start = currentStart();
- return !start.isEmpty() && comparator.compare(name, start) < 0;
- }
-
- protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
- {
- Composite finish = currentFinish();
- return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
- }
-
- protected boolean isAfterSliceFinish(Composite name)
- {
- Composite finish = currentFinish();
- return !finish.isEmpty() && comparator.compare(name, finish) > 0;
- }
- }
-
- private class IndexedBlockFetcher extends BlockFetcher
- {
- // where this row starts
- private final long columnsStart;
-
- // the index entry for the next block to deserialize
- private int nextIndexIdx = -1;
-
- // index of the last block we've read from disk;
- private int lastDeserializedBlock = -1;
-
- // For reversed, keep columns at the beginning of the last deserialized block that
- // may still match a slice
- private final Deque<OnDiskAtom> prefetched;
-
- public IndexedBlockFetcher(long columnsStart)
- {
- super(-1);
- this.columnsStart = columnsStart;
- this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
- setNextSlice();
- }
-
- protected boolean setNextSlice()
- {
- while (++currentSliceIdx < slices.length)
- {
- nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx);
- if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
- // no index block for that slice
- continue;
-
- // Check if we can exclude this slice entirely from the index
- IndexInfo info = indexes.get(nextIndexIdx);
- if (reversed)
- {
- if (!isBeforeSliceStart(info.lastName))
- return true;
- }
- else
- {
- if (!isAfterSliceFinish(info.firstName))
- return true;
- }
- }
- nextIndexIdx = -1;
- return false;
- }
-
- protected boolean hasMoreSlice()
- {
- return currentSliceIdx < slices.length;
- }
-
- protected boolean fetchMoreData()
- {
- if (!hasMoreSlice())
- return false;
-
- // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
- // Note that prefetched keeps columns in reversed disk order.
- // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
- // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
- // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
- // can be mistakenly added this way.
- if (reversed && !prefetched.isEmpty())
- {
- // Avoids some comparison when we know it's not useful
- boolean inSlice = false;
-
- OnDiskAtom prefetchedCol;
- while ((prefetchedCol = prefetched.peek()) != null)
- {
- // col is before slice, we update the slice
- if (isColumnBeforeSliceStart(prefetchedCol))
- {
- inSlice = false;
-
- // As explained above, we add RT unconditionally
- if (prefetchedCol instanceof RangeTombstone)
- {
- blockColumns.addLast(prefetched.poll());
- continue;
- }
-
- // Otherwise, we either move to the next slice. If we have no more slice, then
- // simply unwind prefetched entirely and add all RT.
- if (!setNextSlice())
- {
- while ((prefetchedCol = prefetched.poll()) != null)
- if (prefetchedCol instanceof RangeTombstone)
- blockColumns.addLast(prefetchedCol);
- break;
- }
-
- }
- // col is within slice, all columns
- // (we go in reverse, so as soon as we are in a slice, no need to check
- // we're after the slice until we change slice)
- else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
- {
- blockColumns.addLast(prefetched.poll());
- inSlice = true;
- }
- // if col is after slice, ignore
- else
- {
- prefetched.poll();
- }
- }
-
- if (!blockColumns.isEmpty())
- return true;
- else if (!hasMoreSlice())
- return false;
- }
- try
- {
- return getNextBlock();
- }
- catch (IOException e)
- {
- throw new CorruptSSTableException(e, file.getPath());
- }
- }
-
- private boolean getNextBlock() throws IOException
- {
- if (lastDeserializedBlock == nextIndexIdx)
- {
- if (reversed)
- nextIndexIdx--;
- else
- nextIndexIdx++;
- }
- lastDeserializedBlock = nextIndexIdx;
-
- // Are we done?
- if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
- return false;
-
- IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
-
- /* seek to the correct offset to the data, and calculate the data size */
- long positionToSeek = columnsStart + currentIndex.offset;
-
- // With new promoted indexes, our first seek in the data file will happen at that point.
- if (file == null)
- file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
-
- AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
-
- file.seek(positionToSeek);
- FileMark mark = file.mark();
-
- // We remenber when we are whithin a slice to avoid some comparison
- boolean inSlice = false;
-
- // scan from index start
- while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
- {
- // col is before slice
- // (If in slice, don't bother checking that until we change slice)
- Composite start = currentStart();
- if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
- {
- // If it's a rangeTombstone, then we need to read it and include it unless it's end
- // stops before our slice start.
- if (deserializer.nextIsRangeTombstone())
- {
- RangeTombstone rt = (RangeTombstone)deserializer.readNext();
- if (comparator.compare(rt.max, start) >= 0)
- addColumn(rt);
- continue;
- }
-
- if (reversed)
- {
- // the next slice select columns that are before the current one, so it may
- // match this column, so keep it around.
- prefetched.addFirst(deserializer.readNext());
- }
- else
- {
- deserializer.skipNext();
- }
- }
- // col is within slice
- else
- {
- Composite finish = currentFinish();
- if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
- {
- inSlice = true;
- addColumn(deserializer.readNext());
- }
- // col is after slice.
- else
- {
- // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
- // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
- // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
- // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
- if (reversed && prefetched.isEmpty())
- break;
-
- if (!setNextSlice())
- break;
-
- inSlice = false;
-
- // The next index block now corresponds to the first block that may have columns for the newly set slice.
- // So if it's different from the current block, we're done with this block. And in that case, we know
- // that our prefetched columns won't match.
- if (nextIndexIdx != lastDeserializedBlock)
- {
- if (reversed)
- prefetched.clear();
- break;
- }
-
- // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
- // prefetched and we're done with that block
- if (reversed)
- break;
-
- // otherwise, we will deal with that column at the next iteration
- }
- }
- }
- return true;
- }
- }
-
- private class SimpleBlockFetcher extends BlockFetcher
- {
- public SimpleBlockFetcher() throws IOException
- {
- // Since we have to deserialize in order and will read all slices might as well reverse the slices and
- // behave as if it was not reversed
- super(reversed ? slices.length - 1 : 0);
-
- // We remenber when we are whithin a slice to avoid some comparison
- boolean inSlice = false;
-
- AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
- while (deserializer.hasNext())
- {
- // col is before slice
- // (If in slice, don't bother checking that until we change slice)
- Composite start = currentStart();
- if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
- {
- // If it's a rangeTombstone, then we need to read it and include it unless it's end
- // stops before our slice start. Otherwise, we can skip it.
- if (deserializer.nextIsRangeTombstone())
- {
- RangeTombstone rt = (RangeTombstone)deserializer.readNext();
- if (comparator.compare(rt.max, start) >= 0)
- addColumn(rt);
- }
- else
- {
- deserializer.skipNext();
- }
- continue;
- }
-
- // col is within slice
- Composite finish = currentFinish();
- if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
- {
- inSlice = true;
- addColumn(deserializer.readNext());
- }
- // col is after slice. more slices?
- else
- {
- inSlice = false;
- if (!setNextSlice())
- break;
- }
- }
- }
-
- protected boolean setNextSlice()
- {
- if (reversed)
- {
- if (currentSliceIdx <= 0)
- return false;
-
- currentSliceIdx--;
- }
- else
- {
- if (currentSliceIdx >= slices.length - 1)
- return false;
-
- currentSliceIdx++;
- }
- return true;
- }
-
- protected boolean fetchMoreData()
- {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
deleted file mode 100644
index b8910c7..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
- private ColumnFamily cf;
- private final SSTableReader sstable;
- private FileDataInput fileToClose;
- private Iterator<OnDiskAtom> iter;
- public final SortedSet<CellName> columns;
- public final DecoratedKey key;
-
- public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns)
- {
- assert columns != null;
- this.sstable = sstable;
- this.columns = columns;
- this.key = key;
-
- RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
- if (indexEntry == null)
- return;
-
- try
- {
- read(sstable, null, indexEntry);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new CorruptSSTableException(e, sstable.getFilename());
- }
- finally
- {
- if (fileToClose != null)
- FileUtils.closeQuietly(fileToClose);
- }
- }
-
- public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
- {
- assert columns != null;
- this.sstable = sstable;
- this.columns = columns;
- this.key = key;
-
- try
- {
- read(sstable, file, indexEntry);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new CorruptSSTableException(e, sstable.getFilename());
- }
- }
-
- private FileDataInput createFileDataInput(long position)
- {
- fileToClose = sstable.getFileDataInput(position);
- return fileToClose;
- }
-
- @SuppressWarnings("resource")
- private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
- throws IOException
- {
- List<IndexHelper.IndexInfo> indexList;
-
- // If the entry is not indexed or the index is not promoted, read from the row start
- if (!indexEntry.isIndexed())
- {
- if (file == null)
- file = createFileDataInput(indexEntry.position);
- else
- file.seek(indexEntry.position);
-
- DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
- assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
- }
-
- indexList = indexEntry.columnsIndex();
-
- if (!indexEntry.isIndexed())
- {
- ColumnFamilySerializer serializer = ColumnFamily.serializer;
- try
- {
- cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
- cf.delete(DeletionTime.serializer.deserialize(file));
- }
- catch (Exception e)
- {
- throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
- }
- }
- else
- {
- cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
- cf.delete(indexEntry.deletionTime());
- }
-
- List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
- if (indexList.isEmpty())
- {
- readSimpleColumns(file, columns, result);
- }
- else
- {
- readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result);
- }
-
- // create an iterator view of the columns we read
- iter = result.iterator();
- }
-
- private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result)
- {
- Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
- int n = 0;
- while (atomIterator.hasNext())
- {
- OnDiskAtom column = atomIterator.next();
- if (column instanceof Cell)
- {
- if (columnNames.contains(column.name()))
- {
- result.add(column);
- if (++n >= columns.size())
- break;
- }
- }
- else
- {
- result.add(column);
- }
- }
- }
-
- @SuppressWarnings("resource")
- private void readIndexedColumns(CFMetaData metadata,
- FileDataInput file,
- SortedSet<CellName> columnNames,
- List<IndexHelper.IndexInfo> indexList,
- long basePosition,
- List<OnDiskAtom> result)
- throws IOException
- {
- /* get the various column ranges we have to read */
- CellNameType comparator = metadata.comparator;
- List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
- int lastIndexIdx = -1;
- for (CellName name : columnNames)
- {
- int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
- if (index < 0 || index == indexList.size())
- continue;
- IndexHelper.IndexInfo indexInfo = indexList.get(index);
- // Check the index block does contain the column names and that we haven't inserted this block yet.
- if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
- continue;
-
- ranges.add(indexInfo);
- lastIndexIdx = index;
- }
-
- if (ranges.isEmpty())
- return;
-
- Iterator<CellName> toFetch = columnNames.iterator();
- CellName nextToFetch = toFetch.next();
- for (IndexHelper.IndexInfo indexInfo : ranges)
- {
- long positionToSeek = basePosition + indexInfo.offset;
-
- // With new promoted indexes, our first seek in the data file will happen at that point.
- if (file == null)
- file = createFileDataInput(positionToSeek);
-
- AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
- file.seek(positionToSeek);
- FileMark mark = file.mark();
- while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
- {
- int cmp = deserializer.compareNextTo(nextToFetch);
- if (cmp < 0)
- {
- // If it's a rangeTombstone, then we need to read it and include
- // it if it includes our target. Otherwise, we can skip it.
- if (deserializer.nextIsRangeTombstone())
- {
- RangeTombstone rt = (RangeTombstone)deserializer.readNext();
- if (comparator.compare(rt.max, nextToFetch) >= 0)
- result.add(rt);
- }
- else
- {
- deserializer.skipNext();
- }
- }
- else if (cmp == 0)
- {
- nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
- result.add(deserializer.readNext());
- }
- else
- nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
- }
- }
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- public ColumnFamily getColumnFamily()
- {
- return cf;
- }
-
- protected OnDiskAtom computeNext()
- {
- if (iter == null || !iter.hasNext())
- return endOfData();
- return iter.next();
- }
-
- public void close() throws IOException { }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
deleted file mode 100644
index 07d867d..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-
-/**
- * A Cell Iterator over SSTable
- */
-class SSTableSliceIterator implements OnDiskAtomIterator
-{
- private final OnDiskAtomIterator reader;
- private final DecoratedKey key;
-
- public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed)
- {
- this.key = key;
- RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
- this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed);
- }
-
- /**
- * An iterator for a slice within an SSTable
- * @param sstable Keyspace for the CFS we are reading from
- * @param file Optional parameter that input is read from. If null is passed, this class creates an appropriate one automatically.
- * If this class creates, it will close the underlying file when #close() is called.
- * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file)
- * In all cases the caller should explicitly #close() this iterator.
- * @param key The key the requested slice resides under
- * @param slices the column slices
- * @param reversed Results are returned in reverse order iff reversed is true.
- * @param indexEntry position of the row
- */
- public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry)
- {
- this.key = key;
- reader = createReader(sstable, indexEntry, file, slices, reversed);
- }
-
- private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
- {
- return slices.length == 1 && slices[0].start.isEmpty() && !reversed
- ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish)
- : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed);
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- public ColumnFamily getColumnFamily()
- {
- return reader == null ? null : reader.getColumnFamily();
- }
-
- public boolean hasNext()
- {
- return reader != null && reader.hasNext();
- }
-
- public OnDiskAtom next()
- {
- return reader.next();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close() throws IOException
- {
- if (reader != null)
- reader.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
deleted file mode 100644
index 9fec303..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
- private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
-
- private final FileDataInput file;
- private final boolean needsClosing;
- private final Composite finishColumn;
- private final CellNameType comparator;
- private final ColumnFamily emptyColumnFamily;
- private final Iterator<OnDiskAtom> atomIterator;
-
- SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn)
- {
- Tracing.trace("Seeking to partition beginning in data file");
- this.finishColumn = finishColumn;
- this.comparator = sstable.metadata.comparator;
- try
- {
- if (input == null)
- {
- this.file = sstable.getFileDataInput(indexEntry.position);
- this.needsClosing = true;
- }
- else
- {
- this.file = input;
- input.seek(indexEntry.position);
- this.needsClosing = false;
- }
-
- // Skip key and data size
- ByteBufferUtil.skipShortLength(file);
-
- emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
- emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
- atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new CorruptSSTableException(e, sstable.getFilename());
- }
- }
-
- protected OnDiskAtom computeNext()
- {
- if (!atomIterator.hasNext())
- return endOfData();
-
- OnDiskAtom column = atomIterator.next();
- if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0)
- return endOfData();
-
- return column;
- }
-
- public ColumnFamily getColumnFamily()
- {
- return emptyColumnFamily;
- }
-
- public void close() throws IOException
- {
- if (needsClosing)
- file.close();
- }
-
- public DecoratedKey getKey()
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4bd060e..90a9f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -23,6 +23,7 @@ import java.util.*;
import com.google.common.collect.Maps;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -64,12 +65,12 @@ public class LegacyMetadataSerializer extends MetadataSerializer
out.writeInt(g);
StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
out.writeInt(stats.sstableLevel);
- out.writeInt(stats.minColumnNames.size());
- for (ByteBuffer columnName : stats.minColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- out.writeInt(stats.maxColumnNames.size());
- for (ByteBuffer columnName : stats.maxColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
+ out.writeInt(stats.minClusteringValues.size());
+ for (ByteBuffer value : stats.minClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
+ out.writeInt(stats.maxClusteringValues.size());
+ for (ByteBuffer value : stats.maxClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
}
/**
@@ -127,14 +128,19 @@ public class LegacyMetadataSerializer extends MetadataSerializer
replayPosition,
minTimestamp,
maxTimestamp,
+ Integer.MAX_VALUE,
maxLocalDeletionTime,
+ 0,
+ Integer.MAX_VALUE,
compressionRatio,
tombstoneHistogram,
sstableLevel,
minColumnNames,
maxColumnNames,
true,
- ActiveRepairService.UNREPAIRED_SSTABLE));
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ -1,
+ -1));
if (types.contains(MetadataType.COMPACTION))
components.put(MetadataType.COMPACTION,
new CompactionMetadata(ancestors, null));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 5962a46..2574c62 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,22 +19,20 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -47,13 +45,13 @@ public class MetadataCollector
{
public static final double NO_COMPRESSION_RATIO = -1.0;
- static EstimatedHistogram defaultColumnCountHistogram()
+ static EstimatedHistogram defaultCellPerPartitionCountHistogram()
{
// EH of 114 can track a max value of 2395318855, i.e., > 2B columns
return new EstimatedHistogram(114);
}
- static EstimatedHistogram defaultRowSizeHistogram()
+ static EstimatedHistogram defaultPartitionSizeHistogram()
{
// EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
return new EstimatedHistogram(150);
@@ -66,34 +64,42 @@ public class MetadataCollector
public static StatsMetadata defaultStatsMetadata()
{
- return new StatsMetadata(defaultRowSizeHistogram(),
- defaultColumnCountHistogram(),
+ return new StatsMetadata(defaultPartitionSizeHistogram(),
+ defaultCellPerPartitionCountHistogram(),
ReplayPosition.NONE,
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ 0,
+ Integer.MAX_VALUE,
NO_COMPRESSION_RATIO,
defaultTombstoneDropTimeHistogram(),
0,
Collections.<ByteBuffer>emptyList(),
Collections.<ByteBuffer>emptyList(),
true,
- ActiveRepairService.UNREPAIRED_SSTABLE);
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ -1,
+ -1);
}
- protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
- protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
+ protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
+ // TODO: cound the number of row per partition (either with the number of cells, or instead)
+ protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
protected ReplayPosition replayPosition = ReplayPosition.NONE;
- protected long minTimestamp = Long.MAX_VALUE;
- protected long maxTimestamp = Long.MIN_VALUE;
- protected int maxLocalDeletionTime = Integer.MIN_VALUE;
+ protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
+ protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_DELETION_TIME);
+ protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(LivenessInfo.NO_TTL, LivenessInfo.NO_TTL);
protected double compressionRatio = NO_COMPRESSION_RATIO;
protected Set<Integer> ancestors = new HashSet<>();
protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
protected int sstableLevel;
- protected List<ByteBuffer> minColumnNames = Collections.emptyList();
- protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ protected ByteBuffer[] minClusteringValues;
+ protected ByteBuffer[] maxClusteringValues;
protected boolean hasLegacyCounterShards = false;
+ protected long totalColumnsSet;
+ protected long totalRows;
/**
* Default cardinality estimation method is to use HyperLogLog++.
@@ -102,16 +108,19 @@ public class MetadataCollector
* See CASSANDRA-5906 for detail.
*/
protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
- private final CellNameType columnNameComparator;
+ private final ClusteringComparator comparator;
- public MetadataCollector(CellNameType columnNameComparator)
+ public MetadataCollector(ClusteringComparator comparator)
{
- this.columnNameComparator = columnNameComparator;
+ this.comparator = comparator;
+
+ this.minClusteringValues = new ByteBuffer[comparator.size()];
+ this.maxClusteringValues = new ByteBuffer[comparator.size()];
}
- public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level, boolean skipAncestors)
+ public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level, boolean skipAncestors)
{
- this(columnNameComparator);
+ this(comparator);
replayPosition(ReplayPosition.getReplayPosition(sstables));
sstableLevel(level);
@@ -129,9 +138,9 @@ public class MetadataCollector
}
}
- public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level)
+ public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
{
- this(sstables, columnNameComparator, level, false);
+ this(sstables, comparator, level, false);
}
public MetadataCollector addKey(ByteBuffer key)
@@ -141,15 +150,15 @@ public class MetadataCollector
return this;
}
- public MetadataCollector addRowSize(long rowSize)
+ public MetadataCollector addPartitionSizeInBytes(long partitionSize)
{
- estimatedRowSize.add(rowSize);
+ estimatedPartitionSize.add(partitionSize);
return this;
}
- public MetadataCollector addColumnCount(long columnCount)
+ public MetadataCollector addCellPerPartitionCount(long cellCount)
{
- estimatedColumnCount.add(columnCount);
+ estimatedCellPerPartitionCount.add(cellCount);
return this;
}
@@ -169,34 +178,50 @@ public class MetadataCollector
return this;
}
- public MetadataCollector updateMinTimestamp(long potentialMin)
+ public MetadataCollector update(LivenessInfo newInfo)
{
- minTimestamp = Math.min(minTimestamp, potentialMin);
+ // If the info doesn't have a timestamp, this means the info is basically irrelevant (it's a row
+ // update whose only info we care are the cells info basically).
+ if (newInfo.hasTimestamp())
+ {
+ updateTimestamp(newInfo.timestamp());
+ updateTTL(newInfo.ttl());
+ updateLocalDeletionTime(newInfo.localDeletionTime());
+ }
return this;
}
- public MetadataCollector updateMaxTimestamp(long potentialMax)
+ public MetadataCollector update(DeletionTime dt)
{
- maxTimestamp = Math.max(maxTimestamp, potentialMax);
+ if (!dt.isLive())
+ {
+ updateTimestamp(dt.markedForDeleteAt());
+ updateLocalDeletionTime(dt.localDeletionTime());
+ }
return this;
}
- public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+ public MetadataCollector updateColumnSetPerRow(long columnSetInRow)
{
- this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+ totalColumnsSet += columnSetInRow;
+ ++totalRows;
return this;
}
- public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+ private void updateTimestamp(long newTimestamp)
{
- this.estimatedRowSize = estimatedRowSize;
- return this;
+ timestampTracker.update(newTimestamp);
}
- public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+ private void updateLocalDeletionTime(int newLocalDeletionTime)
{
- this.estimatedColumnCount = estimatedColumnCount;
- return this;
+ localDeletionTimeTracker.update(newLocalDeletionTime);
+ estimatedTombstoneDropTime.update(newLocalDeletionTime);
+ }
+
+ private void updateTTL(int newTTL)
+ {
+ ttlTracker.update(newTTL);
}
public MetadataCollector replayPosition(ReplayPosition replayPosition)
@@ -217,58 +242,179 @@ public class MetadataCollector
return this;
}
- public MetadataCollector updateMinColumnNames(List<ByteBuffer> minColumnNames)
+ public MetadataCollector updateClusteringValues(ClusteringPrefix clustering)
{
- if (minColumnNames.size() > 0)
- this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
+ int size = clustering.size();
+ for (int i = 0; i < size; i++)
+ {
+ AbstractType<?> type = comparator.subtype(i);
+ ByteBuffer newValue = clustering.get(i);
+ minClusteringValues[i] = min(minClusteringValues[i], newValue, type);
+ maxClusteringValues[i] = max(maxClusteringValues[i], newValue, type);
+ }
return this;
}
- public MetadataCollector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
+ private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
{
- if (maxColumnNames.size() > 0)
- this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
- return this;
+ if (b1 == null)
+ return b2;
+ if (b2 == null)
+ return b1;
+
+ if (comparator.compare(b1, b2) >= 0)
+ return b2;
+ return b1;
}
- public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+ private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
{
- this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
- return this;
+ if (b1 == null)
+ return b2;
+ if (b2 == null)
+ return b1;
+
+ if (comparator.compare(b1, b2) >= 0)
+ return b1;
+ return b2;
}
- public MetadataCollector update(long rowSize, ColumnStats stats)
+ public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
{
- updateMinTimestamp(stats.minTimestamp);
- updateMaxTimestamp(stats.maxTimestamp);
- updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
- addRowSize(rowSize);
- addColumnCount(stats.columnCount);
- mergeTombstoneHistogram(stats.tombstoneHistogram);
- updateMinColumnNames(stats.minColumnNames);
- updateMaxColumnNames(stats.maxColumnNames);
- updateHasLegacyCounterShards(stats.hasLegacyCounterShards);
+ this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
return this;
}
- public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt)
+ public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header)
{
Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
- components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
- estimatedColumnCount,
+ components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
+ estimatedCellPerPartitionCount,
replayPosition,
- minTimestamp,
- maxTimestamp,
- maxLocalDeletionTime,
+ timestampTracker.min(),
+ timestampTracker.max(),
+ localDeletionTimeTracker.min(),
+ localDeletionTimeTracker.max(),
+ ttlTracker.min(),
+ ttlTracker.max(),
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
- ImmutableList.copyOf(minColumnNames),
- ImmutableList.copyOf(maxColumnNames),
+ makeList(minClusteringValues),
+ makeList(maxClusteringValues),
hasLegacyCounterShards,
- repairedAt));
+ repairedAt,
+ totalColumnsSet,
+ totalRows));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
+ components.put(MetadataType.HEADER, header.toComponent());
return components;
}
+
+ private static List<ByteBuffer> makeList(ByteBuffer[] values)
+ {
+ // In most case, l will be the same size than values, but it's possible for it to be smaller
+ List<ByteBuffer> l = new ArrayList<ByteBuffer>(values.length);
+ for (int i = 0; i < values.length; i++)
+ if (values[i] == null)
+ break;
+ else
+ l.add(values[i]);
+ return l;
+ }
+
+ public static class MinMaxLongTracker
+ {
+ private final long defaultMin;
+ private final long defaultMax;
+
+ private boolean isSet = false;
+ private long min;
+ private long max;
+
+ public MinMaxLongTracker()
+ {
+ this(Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ public MinMaxLongTracker(long defaultMin, long defaultMax)
+ {
+ this.defaultMin = defaultMin;
+ this.defaultMax = defaultMax;
+ }
+
+ public void update(long value)
+ {
+ if (!isSet)
+ {
+ min = max = value;
+ isSet = true;
+ }
+ else
+ {
+ if (value < min)
+ min = value;
+ if (value > max)
+ max = value;
+ }
+ }
+
+ public long min()
+ {
+ return isSet ? min : defaultMin;
+ }
+
+ public long max()
+ {
+ return isSet ? max : defaultMax;
+ }
+ }
+
+ public static class MinMaxIntTracker
+ {
+ private final int defaultMin;
+ private final int defaultMax;
+
+ private boolean isSet = false;
+ private int min;
+ private int max;
+
+ public MinMaxIntTracker()
+ {
+ this(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ }
+
+ public MinMaxIntTracker(int defaultMin, int defaultMax)
+ {
+ this.defaultMin = defaultMin;
+ this.defaultMax = defaultMax;
+ }
+
+ public void update(int value)
+ {
+ if (!isSet)
+ {
+ min = max = value;
+ isSet = true;
+ }
+ else
+ {
+ if (value < min)
+ min = value;
+ if (value > max)
+ max = value;
+ }
+ }
+
+ public int min()
+ {
+ return isSet ? min : defaultMin;
+ }
+
+ public int max()
+ {
+ return isSet ? max : defaultMax;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 8a65d8d..fcdf57a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -75,7 +75,7 @@ public class MetadataSerializer implements IMetadataSerializer
}
}
- public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+ public Map<MetadataType, MetadataComponent> deserialize( Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
{
Map<MetadataType, MetadataComponent> components;
logger.debug("Load metadata for {}", descriptor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
index 9717da1..875cec4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.io.sstable.metadata;
+import org.apache.cassandra.db.SerializationHeader;
+
/**
* Defines Metadata component type.
*/
@@ -27,7 +29,9 @@ public enum MetadataType
/** Metadata only used at compaction */
COMPACTION(CompactionMetadata.serializer),
/** Metadata always keep in memory */
- STATS(StatsMetadata.serializer);
+ STATS(StatsMetadata.serializer),
+ /** Serialization header */
+ HEADER((IMetadataComponentSerializer)SerializationHeader.serializer);
public final IMetadataComponentSerializer<MetadataComponent> serializer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index f2eb1af..809d6b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -46,42 +46,57 @@ public class StatsMetadata extends MetadataComponent
public final ReplayPosition replayPosition;
public final long minTimestamp;
public final long maxTimestamp;
+ public final int minLocalDeletionTime;
public final int maxLocalDeletionTime;
+ public final int minTTL;
+ public final int maxTTL;
public final double compressionRatio;
public final StreamingHistogram estimatedTombstoneDropTime;
public final int sstableLevel;
- public final List<ByteBuffer> maxColumnNames;
- public final List<ByteBuffer> minColumnNames;
+ public final List<ByteBuffer> minClusteringValues;
+ public final List<ByteBuffer> maxClusteringValues;
public final boolean hasLegacyCounterShards;
public final long repairedAt;
+ public final long totalColumnsSet;
+ public final long totalRows;
public StatsMetadata(EstimatedHistogram estimatedRowSize,
EstimatedHistogram estimatedColumnCount,
ReplayPosition replayPosition,
long minTimestamp,
long maxTimestamp,
+ int minLocalDeletionTime,
int maxLocalDeletionTime,
+ int minTTL,
+ int maxTTL,
double compressionRatio,
StreamingHistogram estimatedTombstoneDropTime,
int sstableLevel,
- List<ByteBuffer> minColumnNames,
- List<ByteBuffer> maxColumnNames,
+ List<ByteBuffer> minClusteringValues,
+ List<ByteBuffer> maxClusteringValues,
boolean hasLegacyCounterShards,
- long repairedAt)
+ long repairedAt,
+ long totalColumnsSet,
+ long totalRows)
{
this.estimatedRowSize = estimatedRowSize;
this.estimatedColumnCount = estimatedColumnCount;
this.replayPosition = replayPosition;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
+ this.minLocalDeletionTime = minLocalDeletionTime;
this.maxLocalDeletionTime = maxLocalDeletionTime;
+ this.minTTL = minTTL;
+ this.maxTTL = maxTTL;
this.compressionRatio = compressionRatio;
this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
this.sstableLevel = sstableLevel;
- this.minColumnNames = minColumnNames;
- this.maxColumnNames = maxColumnNames;
+ this.minClusteringValues = minClusteringValues;
+ this.maxClusteringValues = maxClusteringValues;
this.hasLegacyCounterShards = hasLegacyCounterShards;
this.repairedAt = repairedAt;
+ this.totalColumnsSet = totalColumnsSet;
+ this.totalRows = totalRows;
}
public MetadataType getType()
@@ -120,14 +135,19 @@ public class StatsMetadata extends MetadataComponent
replayPosition,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
estimatedTombstoneDropTime,
newLevel,
- minColumnNames,
- maxColumnNames,
+ minClusteringValues,
+ maxClusteringValues,
hasLegacyCounterShards,
- repairedAt);
+ repairedAt,
+ totalColumnsSet,
+ totalRows);
}
public StatsMetadata mutateRepairedAt(long newRepairedAt)
@@ -137,14 +157,19 @@ public class StatsMetadata extends MetadataComponent
replayPosition,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
- minColumnNames,
- maxColumnNames,
+ minClusteringValues,
+ maxClusteringValues,
hasLegacyCounterShards,
- newRepairedAt);
+ newRepairedAt,
+ totalColumnsSet,
+ totalRows);
}
@Override
@@ -160,14 +185,19 @@ public class StatsMetadata extends MetadataComponent
.append(replayPosition, that.replayPosition)
.append(minTimestamp, that.minTimestamp)
.append(maxTimestamp, that.maxTimestamp)
+ .append(minLocalDeletionTime, that.minLocalDeletionTime)
.append(maxLocalDeletionTime, that.maxLocalDeletionTime)
+ .append(minTTL, that.minTTL)
+ .append(maxTTL, that.maxTTL)
.append(compressionRatio, that.compressionRatio)
.append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
.append(sstableLevel, that.sstableLevel)
.append(repairedAt, that.repairedAt)
- .append(maxColumnNames, that.maxColumnNames)
- .append(minColumnNames, that.minColumnNames)
+ .append(maxClusteringValues, that.maxClusteringValues)
+ .append(minClusteringValues, that.minClusteringValues)
.append(hasLegacyCounterShards, that.hasLegacyCounterShards)
+ .append(totalColumnsSet, that.totalColumnsSet)
+ .append(totalRows, that.totalRows)
.build();
}
@@ -180,14 +210,19 @@ public class StatsMetadata extends MetadataComponent
.append(replayPosition)
.append(minTimestamp)
.append(maxTimestamp)
+ .append(minLocalDeletionTime)
.append(maxLocalDeletionTime)
+ .append(minTTL)
+ .append(maxTTL)
.append(compressionRatio)
.append(estimatedTombstoneDropTime)
.append(sstableLevel)
.append(repairedAt)
- .append(maxColumnNames)
- .append(minColumnNames)
+ .append(maxClusteringValues)
+ .append(minClusteringValues)
.append(hasLegacyCounterShards)
+ .append(totalColumnsSet)
+ .append(totalRows)
.build();
}
@@ -199,18 +234,19 @@ public class StatsMetadata extends MetadataComponent
size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
- size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
+ size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
// min column names
size += 4;
- for (ByteBuffer columnName : component.minColumnNames)
- size += 2 + columnName.remaining(); // with short length
+ for (ByteBuffer value : component.minClusteringValues)
+ size += 2 + value.remaining(); // with short length
// max column names
size += 4;
- for (ByteBuffer columnName : component.maxColumnNames)
- size += 2 + columnName.remaining(); // with short length
+ for (ByteBuffer value : component.maxClusteringValues)
+ size += 2 + value.remaining(); // with short length
size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
+ size += 8 + 8; // totalColumnsSet, totalRows
return size;
}
@@ -221,18 +257,24 @@ public class StatsMetadata extends MetadataComponent
ReplayPosition.serializer.serialize(component.replayPosition, out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
+ out.writeInt(component.minLocalDeletionTime);
out.writeInt(component.maxLocalDeletionTime);
+ out.writeInt(component.minTTL);
+ out.writeInt(component.maxTTL);
out.writeDouble(component.compressionRatio);
StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
out.writeInt(component.sstableLevel);
out.writeLong(component.repairedAt);
- out.writeInt(component.minColumnNames.size());
- for (ByteBuffer columnName : component.minColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- out.writeInt(component.maxColumnNames.size());
- for (ByteBuffer columnName : component.maxColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
+ out.writeInt(component.minClusteringValues.size());
+ for (ByteBuffer value : component.minClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
+ out.writeInt(component.maxClusteringValues.size());
+ for (ByteBuffer value : component.maxClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
out.writeBoolean(component.hasLegacyCounterShards);
+
+ out.writeLong(component.totalColumnsSet);
+ out.writeLong(component.totalRows);
}
public StatsMetadata deserialize(Version version, DataInput in) throws IOException
@@ -242,7 +284,11 @@ public class StatsMetadata extends MetadataComponent
ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
+ // We use MAX_VALUE as that's the default value for "no deletion time"
+ int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
int maxLocalDeletionTime = in.readInt();
+ int minTTL = version.storeRows() ? in.readInt() : 0;
+ int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
double compressionRatio = in.readDouble();
StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
int sstableLevel = in.readInt();
@@ -251,32 +297,40 @@ public class StatsMetadata extends MetadataComponent
repairedAt = in.readLong();
int colCount = in.readInt();
- List<ByteBuffer> minColumnNames = new ArrayList<>(colCount);
+ List<ByteBuffer> minClusteringValues = new ArrayList<>(colCount);
for (int i = 0; i < colCount; i++)
- minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+ minClusteringValues.add(ByteBufferUtil.readWithShortLength(in));
colCount = in.readInt();
- List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
+ List<ByteBuffer> maxClusteringValues = new ArrayList<>(colCount);
for (int i = 0; i < colCount; i++)
- maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+ maxClusteringValues.add(ByteBufferUtil.readWithShortLength(in));
boolean hasLegacyCounterShards = true;
if (version.tracksLegacyCounterShards())
hasLegacyCounterShards = in.readBoolean();
+ long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
+ long totalRows = version.storeRows() ? in.readLong() : -1L;
+
return new StatsMetadata(rowSizes,
columnCounts,
replayPosition,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
tombstoneHistogram,
sstableLevel,
- minColumnNames,
- maxColumnNames,
+ minClusteringValues,
+ maxClusteringValues,
hasLegacyCounterShards,
- repairedAt);
+ repairedAt,
+ totalColumnsSet,
+ totalRows);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 4362cee..c3a7f98 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -112,10 +112,10 @@ public class DataIntegrityMetadata
}
catch (Exception e)
{
+ close();
// Attempting to create a FileDigestValidator without a DIGEST file will fail
throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA));
}
-
}
// Validate the entire file
@@ -133,7 +133,14 @@ public class DataIntegrityMetadata
public void close()
{
- this.digestReader.close();
+ try
+ {
+ this.digestReader.close();
+ }
+ finally
+ {
+ this.dataReader.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 35e1419..c182e58 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -227,6 +227,19 @@ public class FileUtils
}
}
+ public static void closeQuietly(AutoCloseable c)
+ {
+ try
+ {
+ if (c != null)
+ c.close();
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed closing {}", c, e);
+ }
+ }
+
public static void close(Closeable... cs) throws IOException
{
close(Arrays.asList(cs));
@@ -252,6 +265,22 @@ public class FileUtils
throw e;
}
+ public static void closeQuietly(Iterable<? extends AutoCloseable> cs)
+ {
+ for (AutoCloseable c : cs)
+ {
+ try
+ {
+ if (c != null)
+ c.close();
+ }
+ catch (Exception ex)
+ {
+ logger.warn("Failed closing {}", c, ex);
+ }
+ }
+ }
+
public static String getCanonicalPath(String filename)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 83bc337..3f2160f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -81,7 +81,8 @@ public final class MessagingService implements MessagingServiceMBean
public static final int VERSION_20 = 7;
public static final int VERSION_21 = 8;
public static final int VERSION_22 = 9;
- public static final int current_version = VERSION_22;
+ public static final int VERSION_30 = 10;
+ public static final int current_version = VERSION_30;
public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
public static final byte[] ONE_BYTE = new byte[1];
@@ -104,7 +105,7 @@ public final class MessagingService implements MessagingServiceMBean
@Deprecated STREAM_INITIATE_DONE,
@Deprecated STREAM_REPLY,
@Deprecated STREAM_REQUEST,
- RANGE_SLICE,
+ @Deprecated RANGE_SLICE,
@Deprecated BOOTSTRAP_TOKEN,
@Deprecated TREE_REQUEST,
@Deprecated TREE_RESPONSE,
@@ -132,7 +133,7 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PREPARE,
PAXOS_PROPOSE,
PAXOS_COMMIT,
- PAGED_RANGE,
+ @Deprecated PAGED_RANGE,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
@@ -204,8 +205,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MUTATION, Mutation.serializer);
put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
- put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
- put(Verb.PAGED_RANGE, PagedRangeCommand.serializer);
+ //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+ //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
@@ -230,8 +231,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MUTATION, WriteResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
- put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
- put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
+ put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
+ put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
put(Verb.READ, ReadResponse.serializer);
put(Verb.TRUNCATE, TruncateResponse.serializer);
put(Verb.SNAPSHOT, null);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 754e26f..ac20428 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -180,7 +180,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
logger.info("[repair #{}] {}", desc.sessionId, message);
Tracing.traceRepair(message);
- int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+ int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
for (InetAddress endpoint : endpoints)
{
@@ -197,7 +197,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
*/
private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints)
{
- int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+ int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
Queue<InetAddress> requests = new LinkedList<>(endpoints);
@@ -236,7 +236,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
*/
private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints)
{
- int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+ int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();