You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/26 06:30:44 UTC
svn commit: r1150984 - in /cassandra/trunk:
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/security/streaming/
src/java/org/apache/cassandra/stream...
Author: jbellis
Date: Tue Jul 26 04:30:43 2011
New Revision: 1150984
URL: http://svn.apache.org/viewvc?rev=1150984&view=rev
Log:
Remove SSTableWriter.Builder
patch by jbellis and stuhood; reviewed by Yuki Morishita for CASSANDRA-2920
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jul 26 04:30:43 2011
@@ -857,39 +857,6 @@ public class CompactionManager implement
return executor.submit(runnable);
}
- /**
- * Submits an sstable to be rebuilt: is not scheduled, since the sstable must not exist.
- */
- public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
- {
- // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
- final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
- Callable<SSTableReader> callable = new Callable<SSTableReader>()
- {
- public SSTableReader call() throws IOException
- {
- compactionLock.readLock().lock();
- try
- {
- executor.beginCompaction(builder);
- try
- {
- return builder.build();
- }
- finally
- {
- executor.finishCompaction(builder);
- }
- }
- finally
- {
- compactionLock.readLock().unlock();
- }
- }
- };
- return executor.submit(callable);
- }
-
public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
{
Runnable runnable = new WrappedRunnable()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 26 04:30:43 2011
@@ -170,6 +170,11 @@ public class SSTableWriter extends SSTab
afterAppend(decoratedKey, currentPosition);
}
+ public void updateMaxTimestamp(long timestamp)
+ {
+ sstableMetadataCollector.updateMaxTimestamp(timestamp);
+ }
+
/**
* Attempt to close the index writer and data file before deleting all temp components for the sstable
*/
@@ -258,351 +263,6 @@ public class SSTableWriter extends SSTab
{
return dataFile.getFilePointer();
}
-
- public static Builder createBuilder(Descriptor desc, OperationType type)
- {
- if (!desc.isLatestVersion)
- // TODO: streaming between different versions will fail: need support for
- // recovering other versions to provide a stable streaming api
- throw new RuntimeException(String.format("Cannot recover SSTable %s due to version mismatch. (current version is %s).", desc.toString()
- , Descriptor.CURRENT_VERSION));
-
- return new Builder(desc, type);
- }
-
- /**
- * Removes the given SSTable from temporary status and opens it, rebuilding the
- * bloom filter and row index from the data file.
- *
- * TODO remove this post-1.0, we have one-pass streaming now (see IncomingStreamReader)
- */
- public static class Builder implements CompactionInfo.Holder
- {
- private final Descriptor desc;
- private final OperationType type;
- private final ColumnFamilyStore cfs;
- private RowIndexer indexer;
-
- public Builder(Descriptor desc, OperationType type)
- {
- this.desc = desc;
- this.type = type;
- cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
- }
-
- public CompactionInfo getCompactionInfo()
- {
- maybeOpenIndexer();
- try
- {
- // both file offsets are still valid post-close
- return new CompactionInfo(desc.ksname,
- desc.cfname,
- CompactionType.SSTABLE_BUILD,
- indexer.dfile.getFilePointer(),
- indexer.dfile.length());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
- // since the 8MB buffers can use up heap quickly
- private void maybeOpenIndexer()
- {
- if (indexer != null)
- return;
- try
- {
- if (cfs.metadata.getDefaultValidator().isCommutative())
- indexer = new CommutativeRowIndexer(desc, cfs, type);
- else
- indexer = new RowIndexer(desc, cfs, type);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public SSTableReader build() throws IOException
- {
- try
- {
- if (cfs.isInvalid())
- return null;
- maybeOpenIndexer();
-
- File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
- File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
- assert !ifile.exists();
- assert !ffile.exists();
-
- long estimatedRows = indexer.prepareIndexing();
-
- // build the index and filter
- long rows = indexer.index();
-
- logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
- return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, Descriptor.TempState.ANY)));
- }
- finally
- {
- cleanupIfNecessary();
- }
- }
-
- /**
- * Attempt to close the index writer before deleting all temp components for the sstable
- */
- public void cleanupIfNecessary()
- {
- FileUtils.closeQuietly(indexer);
-
- try
- {
- Set<Component> components = SSTable.componentsFor(desc, Descriptor.TempState.TEMP);
- if (!components.isEmpty())
- SSTable.delete(desc, components);
- }
- catch (Exception e)
- {
- logger.error(String.format("Failed deleting temp components for %s", desc), e);
- }
- }
-
- }
-
- static class RowIndexer implements Closeable
- {
- protected final Descriptor desc;
- public final RandomAccessReader dfile;
- private final OperationType type;
-
- protected IndexWriter iwriter;
- protected ColumnFamilyStore cfs;
- protected final SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector();
-
- RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
- {
- this(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
- }
-
- protected RowIndexer(Descriptor desc, RandomAccessReader dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
- {
- this.desc = desc;
- this.dfile = dfile;
- this.type = type;
- this.cfs = cfs;
- }
-
- long prepareIndexing() throws IOException
- {
- long estimatedRows;
- try
- {
- estimatedRows = SSTable.estimateRowsFromData(desc, dfile);
- iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
- return estimatedRows;
- }
- catch(IOException e)
- {
- dfile.close();
- throw e;
- }
- }
-
- long index() throws IOException
- {
- try
- {
- return doIndexing();
- }
- finally
- {
- try
- {
- close();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
- }
-
- public void close() throws IOException
- {
- dfile.close();
- iwriter.close();
- }
-
- /*
- * If the key is cached, we should:
- * - For AES: run the newly received row by the cache
- * - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and
- * then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key
- * would be obsolete and so we must invalidate the cache).
- */
- protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException
- {
- ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null)
- {
- switch (type)
- {
- case AES:
- if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
- // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
- logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
- cfs.invalidateCachedRow(key);
- }
- else
- {
- ColumnFamily cf;
- if (row == null)
- {
- // If not provided, read from disk.
- long position = dfile.getFilePointer();
- cf = ColumnFamily.create(cfs.metadata);
- ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true);
- dfile.seek(position);
- }
- else
- {
- assert row instanceof PrecompactedRow;
- // we do not purge so we should not get a null here
- cf = ((PrecompactedRow)row).getFullColumnFamily();
- }
- cfs.updateRowCache(key, cf);
- }
- break;
- default:
- cfs.invalidateCachedRow(key);
- break;
- }
- }
- }
-
- protected long doIndexing() throws IOException
- {
- long rows = 0;
- DecoratedKey key;
- long rowPosition = 0;
- ColumnFamily cf = ColumnFamily.create(cfs.metadata);
- while (rowPosition < dfile.length())
- {
- // read key
- key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile));
- iwriter.afterAppend(key, rowPosition);
-
- // seek to next key
- long dataSize = SSTableReader.readRowSize(dfile, desc);
- rowPosition = dfile.getFilePointer() + dataSize;
-
- IndexHelper.skipBloomFilter(dfile);
- IndexHelper.skipIndex(dfile);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
-
- // We can't simply get the max column timestamp here by calling cf.maxTimestamp() because
- // the columns have not been deserialized yet. observeColumnsInSSTable() will deserialize
- // and get the max timestamp instead.
- ColumnFamily.serializer().observeColumnsInSSTable(cfs.metadata, dfile, sstableMetadataCollector);
-
- // don't move that statement around, it expects the dfile to be before the columns
- updateCache(key, dataSize, null);
-
- sstableMetadataCollector.addRowSize(dataSize);
-
- dfile.seek(rowPosition);
-
- rows++;
- }
- writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
- return rows;
- }
-
- public String toString()
- {
- return "RowIndexer(" + desc + ")";
- }
- }
-
- /*
- * When a sstable for a counter column family is streamed, we must ensure
- * that on the receiving node all counter column goes through the
- * deserialization from remote code path (i.e, it must be cleared from its
- * delta) to maintain the invariant that on a given node, only increments
- * that the node originated are delta (and copy of those must not be delta).
- *
- * Since after streaming row indexation goes through every streamed
- * sstable, we use this opportunity to ensure this property. This is the
- * goal of this specific CommutativeRowIndexer.
- */
- static class CommutativeRowIndexer extends RowIndexer
- {
- protected SequentialWriter writerDfile;
-
- CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
- {
- super(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
- writerDfile = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true);
- }
-
- @Override
- protected long doIndexing() throws IOException
- {
- long rows = 0L;
- DecoratedKey key;
-
- CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
- while (!dfile.isEOF())
- {
- // read key
- key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile));
-
- // skip data size, bloom filter, column index
- long dataSize = SSTableReader.readRowSize(dfile, desc);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
-
- AbstractCompactedRow row = controller.getCompactedRow(iter);
- updateCache(key, dataSize, row);
-
- sstableMetadataCollector.addRowSize(dataSize);
- sstableMetadataCollector.addColumnCount(row.columnCount());
- sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp());
-
- // update index writer
- iwriter.afterAppend(key, writerDfile.getFilePointer());
- // write key and row
- ByteBufferUtil.writeWithShortLength(key.key, writerDfile.stream);
- row.write(writerDfile.stream);
-
- rows++;
- }
- writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
-
- if (writerDfile.getFilePointer() != dfile.getFilePointer())
- {
- // truncate file to new, reduced length
- writerDfile.truncate(writerDfile.getFilePointer());
- }
- writerDfile.sync();
-
- return rows;
- }
-
- @Override
- public void close() throws IOException
- {
- super.close();
- writerDfile.close();
- }
- }
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jul 26 04:30:43 2011
@@ -28,9 +28,6 @@ import org.apache.cassandra.gms.Gossiper
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
@@ -171,9 +168,6 @@ public class IncomingTcpConnection exten
private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
{
- if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
- new SSLIncomingStreamReader(streamHeader, socket, input).read();
- else
- new IncomingStreamReader(streamHeader, socket).read();
+ new IncomingStreamReader(streamHeader, socket).read();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Tue Jul 26 04:30:43 2011
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.security.streaming;
-
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.io.IOException;
-import java.io.DataInputStream;
-
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.IncomingStreamReader;
-import org.apache.cassandra.streaming.StreamHeader;
-
-/**
- * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom
- * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support
- * encrypting over NIO SocketChannel.
- */
-public class SSLIncomingStreamReader extends IncomingStreamReader
-{
- private final DataInputStream input;
-
- public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException
- {
- super(header, socket);
- this.input = input;
- }
-
- @Override
- protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
- {
- int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
- ByteBuffer buf = ByteBuffer.allocate(toRead);
- input.readFully(buf.array());
- fc.write(buf);
- bytesRead += buf.limit();
- remoteFile.progress += buf.limit();
- return bytesRead;
- }
-}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Tue Jul 26 04:30:43 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.streaming;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
@@ -52,12 +51,12 @@ public class IncomingStreamReader
protected final PendingFile localFile;
protected final PendingFile remoteFile;
- private final SocketChannel socketChannel;
protected final StreamInSession session;
+ private final Socket socket;
public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
- this.socketChannel = socket.getChannel();
+ this.socket = socket;
InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
session.addFiles(header.pendingFiles);
@@ -72,26 +71,19 @@ public class IncomingStreamReader
public void read() throws IOException
{
if (remoteFile != null)
- readFile();
-
- session.closeIfFinished();
- }
-
- protected void readFile() throws IOException
- {
- if (logger.isDebugEnabled())
{
- logger.debug("Receiving stream");
- logger.debug("Creating file for {} with {} estimated keys",
- localFile.getFilename(),
- remoteFile.estimatedKeys);
- }
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Receiving stream");
+ logger.debug("Creating file for {} with {} estimated keys",
+ localFile.getFilename(),
+ remoteFile.estimatedKeys);
+ }
- SSTableReader reader = null;
- if (remoteFile.estimatedKeys > 0)
- {
+ assert remoteFile.estimatedKeys > 0;
+ SSTableReader reader = null;
logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
- DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
+ DataInputStream dis = new DataInputStream(socket.getInputStream());
try
{
reader = streamIn(dis, localFile, remoteFile);
@@ -105,53 +97,11 @@ public class IncomingStreamReader
{
dis.close();
}
- }
- else
- {
- // backwards compatibility path
- FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
- FileChannel fc = fos.getChannel();
- long offset = 0;
- try
- {
- for (Pair<Long, Long> section : localFile.sections)
- {
- long length = section.right - section.left;
- long bytesRead = 0;
- while (bytesRead < length)
- {
- bytesRead = readnwrite(length, bytesRead, offset, fc);
- }
- offset += length;
- }
- }
- catch (IOException ex)
- {
- retry();
- throw ex;
- }
- finally
- {
- fc.close();
- }
+ session.finished(remoteFile, reader);
}
- session.finished(remoteFile, localFile, reader);
- }
-
- protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
- {
- long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
- long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
- // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
- // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and
- // raise an exception (that will be catch belove and 'the right thing' will be done).
- if (lastRead == 0)
- throw new IOException("Transfer failed for remote file " + remoteFile);
- bytesRead += lastRead;
- remoteFile.progress += lastRead;
- return bytesRead;
+ session.closeIfFinished();
}
private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
@@ -183,6 +133,8 @@ public class IncomingStreamReader
SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
AbstractCompactedRow row = controller.getCompactedRow(iter);
writer.append(row);
+ // row append does not update the max timestamp on its own
+ writer.updateMaxTimestamp(row.maxTimestamp());
if (row instanceof PrecompactedRow)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Jul 26 04:30:43 2011
@@ -50,7 +50,6 @@ public class StreamInSession
private final Pair<InetAddress, Long> context;
private final Runnable callback;
private String table;
- private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
@@ -103,22 +102,13 @@ public class StreamInSession
}
}
- public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
+ public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
{
if (logger.isDebugEnabled())
logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
- if (reader != null)
- {
- // SSTR was already built during streaming
- readers.add(reader);
- }
- else
- {
- Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
- buildFutures.add(future);
- }
-
+ assert reader != null;
+ readers.add(reader);
files.remove(remoteFile);
if (remoteFile.equals(current))
current = null;
@@ -143,33 +133,6 @@ public class StreamInSession
List<SSTableReader> referenced = new LinkedList<SSTableReader>();
try
{
- for (Future<SSTableReader> future : buildFutures)
- {
- try
- {
- SSTableReader sstable = future.get();
- assert sstable.getTableName().equals(table);
-
- // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
- sstable.acquireReference();
- referenced.add(sstable);
-
- ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- cfs.addSSTable(sstable);
- if (!cfstores.containsKey(cfs))
- cfstores.put(cfs, new ArrayList<SSTableReader>());
- cfstores.get(cfs).add(sstable);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
-
for (SSTableReader sstable : readers)
{
assert sstable.getTableName().equals(table);
Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Tue Jul 26 04:30:43 2011
@@ -73,33 +73,6 @@ public class LongCompactionSpeedTest ext
testCompaction(100, 800, 5);
}
- /**
- * Test aes counter repair with a very wide row.
- */
- @Test
- public void testAESCountersRepairWide() throws Exception
- {
- testAESCountersRepair(2, 1, 500000);
- }
-
- /**
- * Test aes counter repair with lots of skinny rows.
- */
- @Test
- public void testAESCountersRepairSlim() throws Exception
- {
- testAESCountersRepair(2, 500000, 1);
- }
-
- /**
- * Test aes counter repair with lots of small sstables.
- */
- @Test
- public void testAESCounterRepairMany() throws Exception
- {
- testAESCountersRepair(100, 1000, 5);
- }
-
protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception
{
CompactionManager.instance.disableAutoCompaction();
@@ -140,64 +113,4 @@ public class LongCompactionSpeedTest ext
colsPerRow,
System.currentTimeMillis() - start));
}
-
- protected void testAESCountersRepair(int sstableCount, final int rowsPerSSTable, final int colsPerRow) throws Exception
- {
- final String cfName = "Counter1";
- CompactionManager.instance.disableAutoCompaction();
-
- ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
- for (int k = 0; k < sstableCount; k++)
- {
- final int sstableNum = k;
- SSTableReader sstable = SSTableUtils.prepare().ks(TABLE1).cf(cfName).write(rowsPerSSTable, new SSTableUtils.Appender(){
- int written = 0;
- public boolean append(SSTableWriter writer) throws IOException
- {
- if (written > rowsPerSSTable)
- return false;
-
- DecoratedKey key = Util.dk(String.format("%020d", written));
- ColumnFamily cf = ColumnFamily.create(TABLE1, cfName);
- for (int i = 0; i < colsPerRow; i++)
- cf.addColumn(createCounterColumn(String.valueOf(i)));
- writer.append(key, cf);
- written++;
- return true;
- }
- });
-
- // whack the index to trigger the recover
- FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
- FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.FILTER));
-
- sstables.add(sstable);
- }
-
- // give garbage collection a bit of time to catch up
- Thread.sleep(1000);
-
- long start = System.currentTimeMillis();
-
- for (SSTableReader sstable : sstables)
- CompactionManager.instance.submitSSTableBuild(sstable.descriptor, OperationType.AES).get();
-
- System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
- this.getClass().getName(),
- sstableCount,
- rowsPerSSTable,
- colsPerRow,
- System.currentTimeMillis() - start));
- }
-
- protected CounterColumn createCounterColumn(String name)
- {
- ContextState context = ContextState.allocate(4, 1);
- context.writeElement(NodeId.fromInt(1), 4L, 2L, true);
- context.writeElement(NodeId.fromInt(2), 4L, 2L);
- context.writeElement(NodeId.fromInt(4), 3L, 3L);
- context.writeElement(NodeId.fromInt(8), 2L, 4L);
-
- return new CounterColumn(ByteBufferUtil.bytes(name), context.context, 0L);
- }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Tue Jul 26 04:30:43 2011
@@ -31,10 +31,12 @@ import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.Util;
+import static org.junit.Assert.assertEquals;
public class SSTableUtils
{
@@ -74,6 +76,48 @@ public class SSTableUtils
return datafile;
}
+ public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException
+ {
+ SSTableScanner slhs = lhs.getDirectScanner(2048);
+ SSTableScanner srhs = rhs.getDirectScanner(2048);
+ while (slhs.hasNext())
+ {
+ IColumnIterator ilhs = slhs.next();
+ assert srhs.hasNext() : "LHS contained more rows than RHS";
+ IColumnIterator irhs = srhs.next();
+ assertContentEquals(ilhs, irhs);
+ }
+ assert !srhs.hasNext() : "RHS contained more rows than LHS";
+ }
+
+ public static void assertContentEquals(IColumnIterator lhs, IColumnIterator rhs) throws IOException
+ {
+ assertEquals(lhs.getKey(), rhs.getKey());
+ // check metadata
+ ColumnFamily lcf = lhs.getColumnFamily();
+ ColumnFamily rcf = rhs.getColumnFamily();
+ if (lcf == null)
+ {
+ if (rcf == null)
+ return;
+ throw new AssertionError("LHS had no content for " + rhs.getKey());
+ }
+ else if (rcf == null)
+ throw new AssertionError("RHS had no content for " + lhs.getKey());
+ assertEquals(lcf.getMarkedForDeleteAt(), rcf.getMarkedForDeleteAt());
+ assertEquals(lcf.getLocalDeletionTime(), rcf.getLocalDeletionTime());
+ // iterate columns
+ while (lhs.hasNext())
+ {
+ IColumn clhs = lhs.next();
+ assert rhs.hasNext() : "LHS contained more columns than RHS for " + lhs.getKey();
+ IColumn crhs = rhs.next();
+
+ assertEquals("Mismatched columns for " + lhs.getKey(), clhs, crhs);
+ }
+ assert !rhs.hasNext() : "RHS contained more columns than LHS for " + lhs.getKey();
+ }
+
/**
* @return A Context with chainable methods to configure and write a SSTable.
*/
@@ -190,6 +234,7 @@ public class SSTableUtils
long start = System.currentTimeMillis();
while (appender.append(writer)) { /* pass */ }
SSTableReader reader = writer.closeAndOpenReader();
+ reader.acquireReference();
// mark all components for removal
if (cleanup)
for (Component component : reader.components)
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Jul 26 04:30:43 2011
@@ -20,7 +20,9 @@ package org.apache.cassandra.streaming;
*/
import static junit.framework.Assert.assertEquals;
+import org.apache.cassandra.Util;
import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.addMutation;
import java.net.InetAddress;
import java.util.*;
@@ -29,6 +31,7 @@ import org.apache.cassandra.CleanupHelpe
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -41,6 +44,7 @@ import org.apache.cassandra.thrift.Index
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NodeId;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -56,32 +60,26 @@ public class StreamingTransferTest exten
StorageService.instance.initServer();
}
- @Test
- public void testTransferTable() throws Exception
+ /**
+ * Create and transfer a single sstable, and return the keys that should have been transferred.
+ * The Mutator must create the given column, but it may also create any other columns it pleases.
+ */
+ private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception
{
- Table table = Table.open("Keyspace1");
- ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
-
// write a temporary SSTable, and unregister it
+ long timestamp = 1234;
for (int i = 1; i <= 3; i++)
- {
- String key = "key" + i;
- String col = "col" + i;
- RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
- ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
- cf.addColumn(column(col, "v", 0));
- cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes((long) i), 0));
- rm.add(cf);
- rm.apply();
- }
+ mutator.mutate("key" + i, "col" + i, timestamp);
cfs.forceBlockingFlush();
- assert cfs.getSSTables().size() == 1;
+ Util.compactAll(cfs).get();
+ assertEquals(1, cfs.getSSTables().size());
SSTableReader sstable = cfs.getSSTables().iterator().next();
// We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
sstable.acquireReference();
cfs.removeAllSSTables();
// transfer the first and last key
+ int[] offs = new int[]{1, 3};
IPartitioner p = StorageService.getPartitioner();
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
@@ -90,27 +88,133 @@ public class StreamingTransferTest exten
StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
session.await();
- // confirm that the SSTable was transferred and registered
- List<Row> rows = Util.getRangeSlice(cfs);
- assertEquals(2, rows.size());
- assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key1"));
- assert rows.get(1).key.key.equals( ByteBufferUtil.bytes("key3"));
- assertEquals(2, rows.get(0).cf.getColumnsMap().size());
- assertEquals(2, rows.get(1).cf.getColumnsMap().size());
- assert rows.get(1).cf.getColumn(ByteBufferUtil.bytes("col3")) != null;
+ // confirm that a single SSTable was transferred and registered
+ assertEquals(1, cfs.getSSTables().size());
// and that the index and filter were properly recovered
- assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key1"), new QueryPath(cfs.columnFamily)));
- assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new QueryPath(cfs.columnFamily)));
+ List<Row> rows = Util.getRangeSlice(cfs);
+ assertEquals(offs.length, rows.size());
+ for (int i = 0; i < offs.length; i++)
+ {
+ String key = "key" + offs[i];
+ String col = "col" + offs[i];
+ assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key),
+ new QueryPath(cfs.columnFamily)));
+ assert rows.get(i).key.key.equals(ByteBufferUtil.bytes(key));
+ assert rows.get(i).cf.getColumn(ByteBufferUtil.bytes(col)) != null;
+ }
- // and that the secondary index works
- IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(3L));
- IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
- IFilter filter = new IdentityQueryFilter();
- Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
- rows = cfs.scan(clause, range, filter);
- assertEquals(1, rows.size());
- assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key3")) ;
+ // and that the max timestamp for the file was rediscovered
+ assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp());
+
+ List<String> keys = new ArrayList<String>();
+ for (int off : offs)
+ keys.add("key" + off);
+ return keys;
+ }
+
+ @Test
+ public void testTransferTable() throws Exception
+ {
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
+
+ List<String> keys = createAndTransfer(table, cfs, new Mutator()
+ {
+ public void mutate(String key, String col, long timestamp) throws Exception
+ {
+ long val = key.hashCode();
+ RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
+ ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
+ cf.addColumn(column(col, "v", timestamp));
+ cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
+ rm.add(cf);
+ rm.apply();
+ }
+ });
+
+ // confirm that the secondary index was recovered
+ for (String key : keys)
+ {
+ long val = key.hashCode();
+ IPartitioner p = StorageService.getPartitioner();
+ IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
+ IndexOperator.EQ,
+ ByteBufferUtil.bytes(val));
+ IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+ IFilter filter = new IdentityQueryFilter();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows = cfs.scan(clause, range, filter);
+ assertEquals(1, rows.size());
+ assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
+ }
+ }
+
+ @Test
+ public void testTransferTableSuper() throws Exception
+ {
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfs = table.getColumnFamilyStore("Super1");
+
+ createAndTransfer(table, cfs, new Mutator()
+ {
+ public void mutate(String key, String col, long timestamp) throws Exception
+ {
+ RowMutation rm = new RowMutation(table.name, ByteBufferUtil.bytes(key));
+ addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp);
+ rm.apply();
+ }
+ });
+ }
+
+ @Test
+ public void testTransferTableCounter() throws Exception
+ {
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfs = table.getColumnFamilyStore("Counter1");
+ final CounterContext cc = new CounterContext();
+
+ final Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>();
+
+ List<String> keys = createAndTransfer(table, cfs, new Mutator()
+ {
+ /** Creates a new SSTable per key: all will be merged before streaming. */
+ public void mutate(String key, String col, long timestamp) throws Exception
+ {
+ Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
+ ColumnFamily cf = ColumnFamily.create(cfs.metadata);
+ ColumnFamily cfCleaned = ColumnFamily.create(cfs.metadata);
+ CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 1);
+ state.writeElement(NodeId.fromInt(2), 9L, 3L, true);
+ state.writeElement(NodeId.fromInt(4), 4L, 2L);
+ state.writeElement(NodeId.fromInt(6), 3L, 3L);
+ state.writeElement(NodeId.fromInt(8), 2L, 4L);
+ cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
+ state.context,
+ timestamp));
+ cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
+ cc.clearAllDelta(state.context),
+ timestamp));
+
+ entries.put(key, cf);
+ cleanedEntries.put(key, cfCleaned);
+ cfs.addSSTable(SSTableUtils.prepare()
+ .ks(table.name)
+ .cf(cfs.columnFamily)
+ .generation(0)
+ .write(entries));
+ }
+ });
+
+ // filter pre-cleaned entries locally, and ensure that the end result is equal
+ cleanedEntries.keySet().retainAll(keys);
+ SSTableReader cleaned = SSTableUtils.prepare()
+ .ks(table.name)
+ .cf(cfs.columnFamily)
+ .generation(0)
+ .write(cleanedEntries);
+ SSTableReader streamed = cfs.getSSTables().iterator().next();
+ SSTableUtils.assertContentEquals(cleaned, streamed);
}
@Test
@@ -208,4 +312,9 @@ public class StreamingTransferTest exten
assertEquals(entry.getKey(), rows.get(0).key);
}
}
+
+ public interface Mutator
+ {
+ public void mutate(String key, String col, long timestamp) throws Exception;
+ }
}