You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/06 20:49:33 UTC

cassandra git commit: Add checksum to saved cache files

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 7636a6b86 -> fa6205c90


Add checksum to saved cache files

patch by Daniel Chia; reviewed by Ariel Weisberg for CASSANDRA-9265


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa6205c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa6205c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa6205c9

Branch: refs/heads/cassandra-2.2
Commit: fa6205c909656b09165da4b5ca469328a6450917
Parents: 7636a6b
Author: Daniel Chia <da...@gmail.com>
Authored: Wed Aug 5 18:46:30 2015 -0400
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 21:48:14 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |  96 ++++++++------
 .../cassandra/config/DatabaseDescriptor.java    |  13 +-
 .../io/util/ChecksummedRandomAccessReader.java  | 103 +++++++++++++++
 .../io/util/DataIntegrityMetadata.java          |  16 ++-
 .../io/ChecksummedRandomAccessReaderTest.java   | 127 +++++++++++++++++++
 6 files changed, 311 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 72ad3cd..ff0fdda 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.1
+ * Add checksum to saved cache files (CASSANDRA-9265)
  * Log warning when using an aggregate without partition key (CASSANDRA-9737)
  * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
  * UDF / UDA execution time in trace (CASSANDRA-9723)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index a204a18..05653ba 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
@@ -48,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 {
     public interface IStreamFactory
     {
-        public InputStream getInputStream(File path) throws FileNotFoundException;
-        public OutputStream getOutputStream(File path) throws FileNotFoundException;
+        public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+        public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
     }
 
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -61,18 +62,18 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     protected final CacheService.CacheType cacheType;
 
     private final CacheSerializer<K, V> cacheLoader;
-    private static final String CURRENT_VERSION = "b";
+    private static final String CURRENT_VERSION = "c";
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {
-        public InputStream getInputStream(File path) throws FileNotFoundException
+        public InputStream getInputStream(File dataPath, File crcPath) throws IOException
         {
-            return new FileInputStream(path);
+            return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
-        public OutputStream getOutputStream(File path) throws FileNotFoundException
+        public SequentialWriter getOutputWriter(File dataPath, File crcPath)
         {
-            return new FileOutputStream(path);
+            return SequentialWriter.open(dataPath, crcPath);
         }
     };
 
@@ -89,10 +90,16 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         this.cacheLoader = cacheloader;
     }
 
-    public File getCachePath(UUID cfId, String version)
+    public File getCacheDataPath(UUID cfId, String version)
     {
         Pair<String, String> names = Schema.instance.getCF(cfId);
-        return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version);
+        return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db");
+    }
+
+    public File getCacheCrcPath(UUID cfId, String version)
+    {
+        Pair<String, String> names = Schema.instance.getCF(cfId);
+        return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc");
     }
 
     public Writer getWriter(int keysToSave)
@@ -129,14 +136,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         long start = System.nanoTime();
 
         // modern format, allows both key and value (so key cache load can be purely sequential)
-        File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
-        if (path.exists())
+        File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
+        File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
+        if (dataPath.exists() && crcPath.exists())
         {
             DataInputStream in = null;
             try
             {
-                logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
+                logger.info(String.format("reading saved cache %s", dataPath));
+                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
                 while (in.available() > 0)
                 {
@@ -155,10 +163,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         put(entry.left, entry.right);
                 }
             }
+            catch (CorruptFileException e)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e);
+            }
             catch (Exception e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
-                logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e);
+                logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e);
             }
             finally
             {
@@ -167,7 +180,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         }
         if (logger.isDebugEnabled())
             logger.debug("completed reading ({} ms; {} keys) saved cache {}",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, path);
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, dataPath);
         return count;
     }
 
@@ -241,9 +254,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
-            HashMap<UUID, OutputStream> streams = new HashMap<>();
-            HashMap<UUID, File> paths = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
+            HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+            HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
 
             try
             {
@@ -254,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    DataOutputPlus writer = writers.get(cfId);
+                    DataOutputPlus writer = dataOutputs.get(cfId);
                     if (writer == null)
                     {
-                        File writerPath = tempCacheFile(cfId);
-                        OutputStream stream;
+                        Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
+                        SequentialWriter sequentialWriter;
                         try
                         {
-                            stream = streamFactory.getOutputStream(writerPath);
-                            writer = new WrappedDataOutputStreamPlus(stream);
+                            sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
+                            writer = new WrappedDataOutputStreamPlus(sequentialWriter);
                         }
                         catch (FileNotFoundException e)
                         {
                             throw new RuntimeException(e);
                         }
-                        paths.put(cfId, writerPath);
-                        streams.put(cfId, stream);
-                        writers.put(cfId, writer);
+                        paths.put(cfId, cacheFilePaths);
+                        sequentialWriters.put(cfId, sequentialWriter);
+                        dataOutputs.put(cfId, writer);
                     }
 
                     try
@@ -279,7 +292,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     }
                     catch (IOException e)
                     {
-                        throw new FSWriteError(e, paths.get(cfId));
+                        throw new FSWriteError(e, paths.get(cfId).left);
                     }
 
                     keysWritten++;
@@ -299,29 +312,40 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         // not thrown (by OHC)
                     }
 
-                for (OutputStream writer : streams.values())
+                for (SequentialWriter writer : sequentialWriters.values())
+                {
+                    writer.finish();
                     FileUtils.closeQuietly(writer);
+                }
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
             {
                 UUID cfId = entry.getKey();
 
-                File tmpFile = paths.get(cfId);
-                File cacheFile = getCachePath(cfId, CURRENT_VERSION);
+                Pair<File, File> tmpFiles = paths.get(cfId);
+                File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION);
+                File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION);
 
                 cacheFile.delete(); // ignore error if it didn't exist
-                if (!tmpFile.renameTo(cacheFile))
-                    logger.error("Unable to rename {} to {}", tmpFile, cacheFile);
+                crcFile.delete();
+
+                if (!tmpFiles.left.renameTo(cacheFile))
+                    logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile);
+
+                if (!tmpFiles.right.renameTo(crcFile))
+                    logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile);
             }
 
             logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
         }
 
-        private File tempCacheFile(UUID cfId)
+        private Pair<File, File> tempCacheFiles(UUID cfId)
         {
-            File path = getCachePath(cfId, CURRENT_VERSION);
-            return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
+            File dataPath = getCacheDataPath(cfId, CURRENT_VERSION);
+            File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION);
+            return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()),
+                               FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile()));
         }
 
         private void deleteOldCacheFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5589bc2..e7c76ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1122,7 +1122,7 @@ public class DatabaseDescriptor
     {
         return conf.commitlog_segment_size_in_mb * 1024 * 1024;
     }
-    
+
     public static void setCommitLogSegmentSize(int sizeMegabytes)
     {
         conf.commitlog_segment_size_in_mb = sizeMegabytes;
@@ -1271,7 +1271,7 @@ public class DatabaseDescriptor
     {
         return conf.commitlog_sync_period_in_ms;
     }
-    
+
     public static void setCommitLogSyncPeriod(int periodMillis)
     {
         conf.commitlog_sync_period_in_ms = periodMillis;
@@ -1405,14 +1405,19 @@ public class DatabaseDescriptor
         return conf.max_hint_window_in_ms;
     }
 
-    public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version)
+    public static File getSerializedCachePath(String ksName,
+                                              String cfName,
+                                              UUID cfId,
+                                              CacheService.CacheType cacheType,
+                                              String version,
+                                              String extension)
     {
         StringBuilder builder = new StringBuilder();
         builder.append(ksName).append('-');
         builder.append(cfName).append('-');
         builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
         builder.append(cacheType);
-        builder.append((version == null ? "" : "-" + version + ".db"));
+        builder.append((version == null ? "" : "-" + version + "." + extension));
         return new File(conf.saved_caches_directory, builder.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
new file mode 100644
index 0000000..60b193a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ChecksummedRandomAccessReader extends RandomAccessReader
+{
+    @SuppressWarnings("serial")
+    public static class CorruptFileException extends RuntimeException
+    {
+        public final File file;
+
+        public CorruptFileException(Exception cause, File file) {
+            this.file = file;
+        }
+    }
+
+    private final DataIntegrityMetadata.ChecksumValidator validator;
+    private final File file;
+
+    protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException {
+        super(channel, validator.chunkSize, -1, BufferType.ON_HEAP, null);
+        this.validator = validator;
+        this.file = file;
+    }
+
+    public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
+    {
+        try (ChannelProxy channel = new ChannelProxy(file))
+        {
+            RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
+            DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(),
+                                                                                                            crcReader,
+                                                                                                            file.getPath());
+            return new ChecksummedRandomAccessReader(file, channel, validator);
+        }
+    }
+
+    protected void reBuffer()
+    {
+        long desiredPosition = current();
+        // align with buffer size, as checksums were computed in chunks of buffer size each.
+        bufferOffset = (desiredPosition / buffer.capacity()) * buffer.capacity();
+
+        buffer.clear();
+
+        long position = bufferOffset;
+        while (buffer.hasRemaining())
+        {
+            int n = channel.read(buffer, position);
+            if (n < 0)
+                break;
+            position += n;
+        }
+
+        buffer.flip();
+
+        try
+        {
+            validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
+        }
+        catch (IOException e)
+        {
+            throw new CorruptFileException(e, file);
+        }
+
+        buffer.position((int) (desiredPosition - bufferOffset));
+    }
+
+    public void seek(long newPosition)
+    {
+        validator.seek(newPosition);
+        super.seek(newPosition);
+    }
+
+    public void close()
+    {
+        super.close();
+        validator.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 4362cee..d44bd1c 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -48,14 +48,20 @@ public class DataIntegrityMetadata
     {
         private final Checksum checksum;
         private final RandomAccessReader reader;
-        private final Descriptor descriptor;
         public final int chunkSize;
+        private final String dataFilename;
 
         public ChecksumValidator(Descriptor descriptor) throws IOException
         {
-            this.descriptor = descriptor;
-            checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create();
-            reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC)));
+            this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(),
+                 RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
+                 descriptor.filenameFor(Component.DATA));
+        }
+
+        public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException {
+            this.checksum = checksum;
+            this.reader = reader;
+            this.dataFilename = dataFilename;
             chunkSize = reader.readInt();
         }
 
@@ -78,7 +84,7 @@ public class DataIntegrityMetadata
             checksum.reset();
             int actual = reader.readInt();
             if (current != actual)
-                throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA));
+                throw new IOException("Corrupted File : " + dataFilename);
         }
 
         public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
new file mode 100644
index 0000000..c1e43c9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
+import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+public class ChecksummedRandomAccessReaderTest
+{
+    @Test
+    public void readFully() throws IOException
+    {
+        final File data = File.createTempFile("testReadFully", "data");
+        final File crc = File.createTempFile("testReadFully", "crc");
+
+        final byte[] expected = new byte[70 * 1024];   // bit more than crc chunk size, so we can test rebuffering.
+        ThreadLocalRandom.current().nextBytes(expected);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(expected);
+        writer.finish();
+
+        assert data.exists();
+
+        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+        byte[] b = new byte[expected.length];
+        reader.readFully(b);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+
+    @Test
+    public void seek() throws IOException
+    {
+        final File data = File.createTempFile("testSeek", "data");
+        final File crc = File.createTempFile("testSeek", "crc");
+
+        final byte[] dataBytes = new byte[70 * 1024];   // bit more than crc chunk size
+        ThreadLocalRandom.current().nextBytes(dataBytes);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(dataBytes);
+        writer.finish();
+
+        assert data.exists();
+
+        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+
+        final int seekPosition = 66000;
+        reader.seek(seekPosition);
+
+        byte[] b = new byte[dataBytes.length - seekPosition];
+        reader.readFully(b);
+
+        byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+
+    @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
+    public void corruptionDetection() throws IOException
+    {
+        final File data = File.createTempFile("corruptionDetection", "data");
+        final File crc = File.createTempFile("corruptionDetection", "crc");
+
+        final byte[] expected = new byte[5 * 1024];
+        Arrays.fill(expected, (byte) 0);
+
+        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+        writer.write(expected);
+        writer.finish();
+
+        assert data.exists();
+
+        // simulate corruption of file
+        try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
+        {
+            dataFile.seek(1024);
+            dataFile.write((byte) 5);
+        }
+
+        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
+        byte[] b = new byte[expected.length];
+        reader.readFully(b);
+
+        assertArrayEquals(expected, b);
+
+        assertTrue(reader.isEOF());
+
+        reader.close();
+    }
+}