You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/01/15 19:22:59 UTC
[05/15] cassandra git commit: Revert CASSANDRA-10012 and add more
loggings
Revert CASSANDRA-10012 and add more loggings
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58a0079c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58a0079c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58a0079c
Branch: refs/heads/cassandra-3.3
Commit: 58a0079c391d12dab97e036f05be070dfaddcc95
Parents: abe0c67
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Jan 15 12:04:32 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jan 15 12:09:56 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/composites/AbstractCType.java | 3 +-
.../cassandra/streaming/ConnectionHandler.java | 2 +-
.../cassandra/streaming/StreamReader.java | 26 ++++++++---
.../cassandra/streaming/StreamWriter.java | 9 ++++
.../compress/CompressedInputStream.java | 45 +++++++------------
.../compress/CompressedStreamReader.java | 31 +++++++++----
.../compress/CompressedStreamWriter.java | 12 +++++
.../compress/CompressedInputStreamTest.java | 46 --------------------
9 files changed, 83 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b87ed0..3d84a30 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.13
+ * Revert CASSANDRA-10012 and add more logging (CASSANDRA-10961)
* Allow simultaneous bootstrapping with strict consistency when no vnodes are used (CASSANDRA-11005)
* Log a message when major compaction does not result in a single file (CASSANDRA-10847)
* (cqlsh) fix cqlsh_copy_tests when vnodes are disabled (CASSANDRA-10997)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index 5af7458..fecc847 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -375,7 +375,8 @@ public abstract class AbstractCType implements CType
protected static void checkRemaining(ByteBuffer bb, int offs, int length)
{
if (offs + length > bb.limit())
- throw new IllegalArgumentException("Not enough bytes");
+ throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d",
+ offs, length, bb.limit()));
}
private static class Serializer implements CType.Serializer
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index aa3504a..ac267f9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -248,11 +248,11 @@ public class ConnectionHandler
{
// receive message
StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+ logger.debug("[Stream #{}] Received {}", session.planId(), message);
// Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
// to ignore here since we'll have asked for a retry.
if (message != null)
{
- logger.debug("[Stream #{}] Received {}", session.planId(), message);
session.messageReceived(message);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 18013fe..1e3ba7f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -58,6 +58,7 @@ public class StreamReader
protected final StreamSession session;
protected final Descriptor.Version inputVersion;
protected final long repairedAt;
+ protected final int fileSeqNum;
protected Descriptor desc;
@@ -69,6 +70,7 @@ public class StreamReader
this.sections = header.sections;
this.inputVersion = new Descriptor.Version(header.version);
this.repairedAt = header.repairedAt;
+ this.fileSeqNum = header.sequenceNumber;
}
/**
@@ -78,33 +80,46 @@ public class StreamReader
*/
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
- logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
- if (kscf == null)
+ ColumnFamilyStore cfs = null;
+ if (kscf != null)
+ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ if (kscf == null || cfs == null)
{
// schema was dropped during streaming
throw new IOException("CF " + cfId + " was dropped during streaming");
}
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+ session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+ cfs.getColumnFamilyName());
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
SSTableWriter writer = null;
+ DecoratedKey key = null;
try
{
writer = createWriter(cfs, totalSize, repairedAt);
while (in.getBytesRead() < totalSize)
{
- writeRow(writer, in, cfs);
+ key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ writeRow(key, writer, in, cfs);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
+ logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+ session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
return writer;
}
catch (Throwable e)
{
+ if (key != null)
+ logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+ session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
if (writer != null)
{
try
@@ -162,9 +177,8 @@ public class StreamReader
return size;
}
- protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+ protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
{
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
writer.appendFromStream(key, cfs.metadata, in, inputVersion);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 43bc26a..2579414 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -24,6 +24,9 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.ning.compress.lzf.LZFOutputStream;
import org.apache.cassandra.io.sstable.Component;
@@ -42,6 +45,8 @@ public class StreamWriter
{
private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
+ private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
protected final SSTableReader sstable;
protected final Collection<Pair<Long, Long>> sections;
protected final StreamRateLimiter limiter;
@@ -71,6 +76,8 @@ public class StreamWriter
public void write(WritableByteChannel channel) throws IOException
{
long totalSize = totalSize();
+ logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
RandomAccessReader file = sstable.openDataReader();
ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
@@ -109,6 +116,8 @@ public class StreamWriter
// make sure that current section is send
compressedOutput.flush();
}
+ logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+ session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index b4a3065..6280ccd 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -31,6 +31,9 @@ import java.util.zip.Checksum;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -40,12 +43,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
*/
public class CompressedInputStream extends InputStream
{
+
+ private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
private final CompressionInfo info;
// chunk buffer
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private final byte[] buffer;
+ private byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,8 +70,6 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
- private Thread readerThread;
-
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -77,10 +81,9 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
- readerThread = new Thread(new Reader(source, info, dataBuffer));
- readerThread.start();
+ new Thread(new Reader(source, info, dataBuffer)).start();
}
public int read() throws IOException
@@ -146,7 +149,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- class Reader extends WrappedRunnable
+ static class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -162,7 +165,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+ while (chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -172,43 +175,25 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r;
try
{
- r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
if (r < 0)
{
dataBuffer.put(POISON_PILL);
return; // throw exception where we consume dataBuffer
}
+ bufferRead += r;
}
catch (IOException e)
{
+ logger.warn("Error while reading compressed input stream.", e);
dataBuffer.put(POISON_PILL);
- throw e;
+ return; // throw exception where we consume dataBuffer
}
- bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
- synchronized(CompressedInputStream.this)
- {
- readerThread = null;
- }
- }
- }
-
- @Override
- public void close() throws IOException
- {
- synchronized(this)
- {
- if (readerThread != null)
- {
- readerThread.interrupt();
- readerThread = null;
- }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4f60773..fd0d9c8 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,6 +24,7 @@ import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
+import org.apache.cassandra.db.DecoratedKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +33,12 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
@@ -61,40 +64,56 @@ public class CompressedStreamReader extends StreamReader
@Override
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
- logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
- if (kscf == null)
+ ColumnFamilyStore cfs = null;
+ if (kscf != null)
+ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ if (kscf == null || cfs == null)
{
// schema was dropped during streaming
throw new IOException("CF " + cfId + " was dropped during streaming");
}
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+ session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+ cfs.getColumnFamilyName());
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
SSTableWriter writer = null;
+ DecoratedKey key = null;
try
{
writer = createWriter(cfs, totalSize, repairedAt);
+ int sectionIdx = 0;
for (Pair<Long, Long> section : sections)
{
long length = section.right - section.left;
// skip to beginning of section inside chunk
cis.position(section.left);
in.reset(0);
+ logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, length);
while (in.getBytesRead() < length)
{
- writeRow(writer, in, cfs);
+ key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ writeRow(key, writer, in, cfs);
+
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
}
}
+ logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+ session.peer, cis.getTotalCompressedBytesRead(), totalSize);
return writer;
}
catch (Throwable e)
{
+ if (key != null)
+ logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+ session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
if (writer != null)
{
try
@@ -113,10 +132,6 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
- finally
- {
- cis.close();
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 001c927..6fe08e6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,6 +24,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -40,6 +43,8 @@ public class CompressedStreamWriter extends StreamWriter
{
public static final int CHUNK_SIZE = 10 * 1024 * 1024;
+ private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
private final CompressionInfo compressionInfo;
public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
@@ -52,12 +57,15 @@ public class CompressedStreamWriter extends StreamWriter
public void write(WritableByteChannel channel) throws IOException
{
long totalSize = totalSize();
+ logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
RandomAccessReader file = sstable.openDataReader();
FileChannel fc = file.getChannel();
long progress = 0L;
// calculate chunks to transfer. we want to send continuous chunks altogether.
List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+ int sectionIdx = 0;
try
{
// stream each of the required sections of the file
@@ -65,6 +73,8 @@ public class CompressedStreamWriter extends StreamWriter
{
// length of the section to stream
long length = section.right - section.left;
+ logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
// tracks write progress
long bytesTransferred = 0;
while (bytesTransferred < length)
@@ -77,6 +87,8 @@ public class CompressedStreamWriter extends StreamWriter
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
}
}
+ logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+ session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 87e0003..c70b932 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
}
/**
- * Test CompressedInputStream not hang when closed while reading
- * @throws Exception
- */
- @Test(expected = EOFException.class)
- public void testClose() throws Exception
- {
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
- CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
- final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
- InputStream blockingInput = new InputStream()
- {
- @Override
- public int read() throws IOException
- {
- try
- {
- // 10 second cut off not to stop other test in case
- return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
- }
- catch (InterruptedException e)
- {
- throw new IOException("Interrupted as expected", e);
- }
- }
- };
- CompressionInfo info = new CompressionInfo(chunks, param);
- try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
- {
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- cis.close();
- }
- catch (Exception ignore) {}
- }
- }).start();
- // block here
- cis.read();
- }
- }
-
- /**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
*/