You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/06 20:53:40 UTC
[1/2] cassandra git commit: Add checksum to saved cache files
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 39ee04088 -> a472aa9ea
Add checksum to saved cache files
patch by Daniel Chia; reviewed by Ariel Weisberg for CASSANDRA-9265
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa6205c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa6205c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa6205c9
Branch: refs/heads/cassandra-3.0
Commit: fa6205c909656b09165da4b5ca469328a6450917
Parents: 7636a6b
Author: Daniel Chia <da...@gmail.com>
Authored: Wed Aug 5 18:46:30 2015 -0400
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 21:48:14 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 96 ++++++++------
.../cassandra/config/DatabaseDescriptor.java | 13 +-
.../io/util/ChecksummedRandomAccessReader.java | 103 +++++++++++++++
.../io/util/DataIntegrityMetadata.java | 16 ++-
.../io/ChecksummedRandomAccessReaderTest.java | 127 +++++++++++++++++++
6 files changed, 311 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 72ad3cd..ff0fdda 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.1
+ * Add checksum to saved cache files (CASSANDRA-9265)
* Log warning when using an aggregate without partition key (CASSANDRA-9737)
* Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
* UDF / UDA execution time in trace (CASSANDRA-9723)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index a204a18..05653ba 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
@@ -48,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
- public InputStream getInputStream(File path) throws FileNotFoundException;
- public OutputStream getOutputStream(File path) throws FileNotFoundException;
+ public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+ public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
}
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -61,18 +62,18 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
protected final CacheService.CacheType cacheType;
private final CacheSerializer<K, V> cacheLoader;
- private static final String CURRENT_VERSION = "b";
+ private static final String CURRENT_VERSION = "c";
private static volatile IStreamFactory streamFactory = new IStreamFactory()
{
- public InputStream getInputStream(File path) throws FileNotFoundException
+ public InputStream getInputStream(File dataPath, File crcPath) throws IOException
{
- return new FileInputStream(path);
+ return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
- public OutputStream getOutputStream(File path) throws FileNotFoundException
+ public SequentialWriter getOutputWriter(File dataPath, File crcPath)
{
- return new FileOutputStream(path);
+ return SequentialWriter.open(dataPath, crcPath);
}
};
@@ -89,10 +90,16 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
this.cacheLoader = cacheloader;
}
- public File getCachePath(UUID cfId, String version)
+ public File getCacheDataPath(UUID cfId, String version)
{
Pair<String, String> names = Schema.instance.getCF(cfId);
- return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version);
+ return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db");
+ }
+
+ public File getCacheCrcPath(UUID cfId, String version)
+ {
+ Pair<String, String> names = Schema.instance.getCF(cfId);
+ return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc");
}
public Writer getWriter(int keysToSave)
@@ -129,14 +136,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
// modern format, allows both key and value (so key cache load can be purely sequential)
- File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
- if (path.exists())
+ File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
+ File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
+ if (dataPath.exists() && crcPath.exists())
{
DataInputStream in = null;
try
{
- logger.info(String.format("reading saved cache %s", path));
- in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
+ logger.info(String.format("reading saved cache %s", dataPath));
+ in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
while (in.available() > 0)
{
@@ -155,10 +163,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
put(entry.left, entry.right);
}
}
+ catch (CorruptFileException e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e);
+ }
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
- logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e);
+ logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e);
}
finally
{
@@ -167,7 +180,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
if (logger.isDebugEnabled())
logger.debug("completed reading ({} ms; {} keys) saved cache {}",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, path);
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, dataPath);
return count;
}
@@ -241,9 +254,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
- HashMap<UUID, OutputStream> streams = new HashMap<>();
- HashMap<UUID, File> paths = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
+ HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+ HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
try
{
@@ -254,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- DataOutputPlus writer = writers.get(cfId);
+ DataOutputPlus writer = dataOutputs.get(cfId);
if (writer == null)
{
- File writerPath = tempCacheFile(cfId);
- OutputStream stream;
+ Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
+ SequentialWriter sequentialWriter;
try
{
- stream = streamFactory.getOutputStream(writerPath);
- writer = new WrappedDataOutputStreamPlus(stream);
+ sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
+ writer = new WrappedDataOutputStreamPlus(sequentialWriter);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
- paths.put(cfId, writerPath);
- streams.put(cfId, stream);
- writers.put(cfId, writer);
+ paths.put(cfId, cacheFilePaths);
+ sequentialWriters.put(cfId, sequentialWriter);
+ dataOutputs.put(cfId, writer);
}
try
@@ -279,7 +292,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
catch (IOException e)
{
- throw new FSWriteError(e, paths.get(cfId));
+ throw new FSWriteError(e, paths.get(cfId).left);
}
keysWritten++;
@@ -299,29 +312,40 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
// not thrown (by OHC)
}
- for (OutputStream writer : streams.values())
+ for (SequentialWriter writer : sequentialWriters.values())
+ {
+ writer.finish();
FileUtils.closeQuietly(writer);
+ }
}
- for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
{
UUID cfId = entry.getKey();
- File tmpFile = paths.get(cfId);
- File cacheFile = getCachePath(cfId, CURRENT_VERSION);
+ Pair<File, File> tmpFiles = paths.get(cfId);
+ File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION);
+ File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION);
cacheFile.delete(); // ignore error if it didn't exist
- if (!tmpFile.renameTo(cacheFile))
- logger.error("Unable to rename {} to {}", tmpFile, cacheFile);
+ crcFile.delete();
+
+ if (!tmpFiles.left.renameTo(cacheFile))
+ logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile);
+
+ if (!tmpFiles.right.renameTo(crcFile))
+ logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile);
}
logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
- private File tempCacheFile(UUID cfId)
+ private Pair<File, File> tempCacheFiles(UUID cfId)
{
- File path = getCachePath(cfId, CURRENT_VERSION);
- return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
+ File dataPath = getCacheDataPath(cfId, CURRENT_VERSION);
+ File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION);
+ return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()),
+ FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile()));
}
private void deleteOldCacheFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5589bc2..e7c76ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1122,7 +1122,7 @@ public class DatabaseDescriptor
{
return conf.commitlog_segment_size_in_mb * 1024 * 1024;
}
-
+
public static void setCommitLogSegmentSize(int sizeMegabytes)
{
conf.commitlog_segment_size_in_mb = sizeMegabytes;
@@ -1271,7 +1271,7 @@ public class DatabaseDescriptor
{
return conf.commitlog_sync_period_in_ms;
}
-
+
public static void setCommitLogSyncPeriod(int periodMillis)
{
conf.commitlog_sync_period_in_ms = periodMillis;
@@ -1405,14 +1405,19 @@ public class DatabaseDescriptor
return conf.max_hint_window_in_ms;
}
- public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version)
+ public static File getSerializedCachePath(String ksName,
+ String cfName,
+ UUID cfId,
+ CacheService.CacheType cacheType,
+ String version,
+ String extension)
{
StringBuilder builder = new StringBuilder();
builder.append(ksName).append('-');
builder.append(cfName).append('-');
builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
builder.append(cacheType);
- builder.append((version == null ? "" : "-" + version + ".db"));
+ builder.append((version == null ? "" : "-" + version + "." + extension));
return new File(conf.saved_caches_directory, builder.toString());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
new file mode 100644
index 0000000..60b193a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ChecksummedRandomAccessReader extends RandomAccessReader
+{
+ @SuppressWarnings("serial")
+ public static class CorruptFileException extends RuntimeException
+ {
+ public final File file;
+
+ public CorruptFileException(Exception cause, File file) {
+ this.file = file;
+ }
+ }
+
+ private final DataIntegrityMetadata.ChecksumValidator validator;
+ private final File file;
+
+ protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException {
+ super(channel, validator.chunkSize, -1, BufferType.ON_HEAP, null);
+ this.validator = validator;
+ this.file = file;
+ }
+
+ public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
+ {
+ try (ChannelProxy channel = new ChannelProxy(file))
+ {
+ RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
+ DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(),
+ crcReader,
+ file.getPath());
+ return new ChecksummedRandomAccessReader(file, channel, validator);
+ }
+ }
+
+ protected void reBuffer()
+ {
+ long desiredPosition = current();
+ // align with buffer size, as checksums were computed in chunks of buffer size each.
+ bufferOffset = (desiredPosition / buffer.capacity()) * buffer.capacity();
+
+ buffer.clear();
+
+ long position = bufferOffset;
+ while (buffer.hasRemaining())
+ {
+ int n = channel.read(buffer, position);
+ if (n < 0)
+ break;
+ position += n;
+ }
+
+ buffer.flip();
+
+ try
+ {
+ validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
+ }
+ catch (IOException e)
+ {
+ throw new CorruptFileException(e, file);
+ }
+
+ buffer.position((int) (desiredPosition - bufferOffset));
+ }
+
+ public void seek(long newPosition)
+ {
+ validator.seek(newPosition);
+ super.seek(newPosition);
+ }
+
+ public void close()
+ {
+ super.close();
+ validator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 4362cee..d44bd1c 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -48,14 +48,20 @@ public class DataIntegrityMetadata
{
private final Checksum checksum;
private final RandomAccessReader reader;
- private final Descriptor descriptor;
public final int chunkSize;
+ private final String dataFilename;
public ChecksumValidator(Descriptor descriptor) throws IOException
{
- this.descriptor = descriptor;
- checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create();
- reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC)));
+ this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(),
+ RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
+ descriptor.filenameFor(Component.DATA));
+ }
+
+ public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException {
+ this.checksum = checksum;
+ this.reader = reader;
+ this.dataFilename = dataFilename;
chunkSize = reader.readInt();
}
@@ -78,7 +84,7 @@ public class DataIntegrityMetadata
checksum.reset();
int actual = reader.readInt();
if (current != actual)
- throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA));
+ throw new IOException("Corrupted File : " + dataFilename);
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
new file mode 100644
index 0000000..c1e43c9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
+import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+public class ChecksummedRandomAccessReaderTest
+{
+ @Test
+ public void readFully() throws IOException
+ {
+ final File data = File.createTempFile("testReadFully", "data");
+ final File crc = File.createTempFile("testReadFully", "crc");
+
+ final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering.
+ ThreadLocalRandom.current().nextBytes(expected);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(expected);
+ writer.finish();
+
+ assert data.exists();
+
+ RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+ byte[] b = new byte[expected.length];
+ reader.readFully(b);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+
+ @Test
+ public void seek() throws IOException
+ {
+ final File data = File.createTempFile("testSeek", "data");
+ final File crc = File.createTempFile("testSeek", "crc");
+
+ final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size
+ ThreadLocalRandom.current().nextBytes(dataBytes);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(dataBytes);
+ writer.finish();
+
+ assert data.exists();
+
+ RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+
+ final int seekPosition = 66000;
+ reader.seek(seekPosition);
+
+ byte[] b = new byte[dataBytes.length - seekPosition];
+ reader.readFully(b);
+
+ byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+
+ @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
+ public void corruptionDetection() throws IOException
+ {
+ final File data = File.createTempFile("corruptionDetection", "data");
+ final File crc = File.createTempFile("corruptionDetection", "crc");
+
+ final byte[] expected = new byte[5 * 1024];
+ Arrays.fill(expected, (byte) 0);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(expected);
+ writer.finish();
+
+ assert data.exists();
+
+ // simulate corruption of file
+ try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
+ {
+ dataFile.seek(1024);
+ dataFile.write((byte) 5);
+ }
+
+ RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+ byte[] b = new byte[expected.length];
+ reader.readFully(b);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+}
[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a472aa9e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a472aa9e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a472aa9e
Branch: refs/heads/cassandra-3.0
Commit: a472aa9eaaff3f67035c53dd92b4aee24f2bed36
Parents: 39ee040 fa6205c
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Aug 6 21:54:20 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 21:54:20 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 96 ++++++++------
.../cassandra/config/DatabaseDescriptor.java | 13 +-
.../io/util/ChecksummedRandomAccessReader.java | 110 ++++++++++++++++
.../io/util/DataIntegrityMetadata.java | 17 ++-
.../io/ChecksummedRandomAccessReaderTest.java | 127 +++++++++++++++++++
6 files changed, 319 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 95fade9,ff0fdda..a894297
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,6 +1,44 @@@
-2.2.1
+3.0.0-beta1
+ * Optimize batchlog replay to avoid full scans (CASSANDRA-7237)
+ * Repair improvements when using vnodes (CASSANDRA-5220)
+ * Disable scripted UDFs by default (CASSANDRA-9889)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+ * Bytecode inspection for Java-UDFs (CASSANDRA-9890)
+ * Use byte to serialize MT hash length (CASSANDRA-9792)
+Merged from 2.2:
+ * Add checksum to saved cache files (CASSANDRA-9265)
* Log warning when using an aggregate without partition key (CASSANDRA-9737)
+Merged from 2.1:
+ * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
+Merged from 2.0:
+ * Don't cast expected bf size to an int (CASSANDRA-9959)
+
+
+3.0.0-alpha1
+ * Implement proper sandboxing for UDFs (CASSANDRA-9402)
+ * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
+ * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
+ * Metrics should use up to date nomenclature (CASSANDRA-9448)
+ * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
+ * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
+ * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825,
+ 9848, 9705, 9859, 9867, 9874, 9828, 9801)
+ * Update Guava to 18.0 (CASSANDRA-9653)
+ * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
+ * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)
+ * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
+ * Add algorithmic token allocation (CASSANDRA-7032)
+ * Add nodetool command to replay batchlog (CASSANDRA-9547)
+ * Make file buffer cache independent of paths being read (CASSANDRA-8897)
+ * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
+ * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
+ * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
+ * Change default garbage collector to G1 (CASSANDRA-7486)
+ * Populate TokenMetadata early during startup (CASSANDRA-9317)
+ * Undeprecate cache recentHitRate (CASSANDRA-6591)
+ * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
+ * Materialized Views (CASSANDRA-6477)
+Merged from 2.2:
* Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
* UDF / UDA execution time in trace (CASSANDRA-9723)
* Fix broken internode SSL (CASSANDRA-9884)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 0b334f5,05653ba..3c5b6a5
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -38,7 -39,7 +38,8 @@@ import org.apache.cassandra.db.compacti
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
+ import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
@@@ -129,14 -136,15 +136,15 @@@ public class AutoSavingCache<K extends
long start = System.nanoTime();
// modern format, allows both key and value (so key cache load can be purely sequential)
- File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
- if (path.exists())
+ File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
+ File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
+ if (dataPath.exists() && crcPath.exists())
{
- DataInputStream in = null;
+ DataInputStreamPlus in = null;
try
{
- logger.info(String.format("reading saved cache %s", path));
- in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
+ logger.info(String.format("reading saved cache %s", dataPath));
- in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
++ in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
while (in.available() > 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 0000000,60b193a..976ff23
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@@ -1,0 -1,103 +1,110 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.io.util;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.zip.Adler32;
+
+ import org.apache.cassandra.io.compress.BufferType;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+
+ public class ChecksummedRandomAccessReader extends RandomAccessReader
+ {
+ @SuppressWarnings("serial")
+ public static class CorruptFileException extends RuntimeException
+ {
+ public final File file;
+
- public CorruptFileException(Exception cause, File file) {
++ public CorruptFileException(Exception cause, File file)
++ {
+ this.file = file;
+ }
+ }
+
+ private final DataIntegrityMetadata.ChecksumValidator validator;
+ private final File file;
+
- protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException {
- super(channel, validator.chunkSize, -1, BufferType.ON_HEAP, null);
++ protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException
++ {
++ super(channel, validator.chunkSize, -1, BufferType.ON_HEAP);
+ this.validator = validator;
+ this.file = file;
+ }
+
+ public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
+ {
- try (ChannelProxy channel = new ChannelProxy(file))
- {
- RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
- DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(),
- crcReader,
- file.getPath());
- return new ChecksummedRandomAccessReader(file, channel, validator);
- }
++ ChannelProxy channel = new ChannelProxy(file);
++ RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
++ DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(),
++ crcReader,
++ file.getPath());
++ return new ChecksummedRandomAccessReader(file, channel, validator);
+ }
+
+ protected void reBuffer()
+ {
+ long desiredPosition = current();
+ // align with buffer size, as checksums were computed in chunks of buffer size each.
+ bufferOffset = (desiredPosition / buffer.capacity()) * buffer.capacity();
+
+ buffer.clear();
+
+ long position = bufferOffset;
+ while (buffer.hasRemaining())
+ {
+ int n = channel.read(buffer, position);
+ if (n < 0)
+ break;
+ position += n;
+ }
+
+ buffer.flip();
+
+ try
+ {
+ validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
+ }
+ catch (IOException e)
+ {
+ throw new CorruptFileException(e, file);
+ }
+
+ buffer.position((int) (desiredPosition - bufferOffset));
+ }
+
+ public void seek(long newPosition)
+ {
+ validator.seek(newPosition);
+ super.seek(newPosition);
+ }
+
+ public void close()
+ {
- super.close();
- validator.close();
++ try
++ {
++ super.close();
++ }
++ finally
++ {
++ channel.close();
++ validator.close();
++ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 073fc04,d44bd1c..ac2ab47
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@@ -53,9 -53,15 +53,16 @@@ public class DataIntegrityMetadat
public ChecksumValidator(Descriptor descriptor) throws IOException
{
- this.descriptor = descriptor;
- checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32();
- reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC)));
- this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(),
++ this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32(),
+ RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
+ descriptor.filenameFor(Component.DATA));
+ }
+
- public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException {
++ public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException
++ {
+ this.checksum = checksum;
+ this.reader = reader;
+ this.dataFilename = dataFilename;
chunkSize = reader.readInt();
}