You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/27 22:47:19 UTC
[3/5] clean up ioexceptions patch by Aleksey Yeschenko and jbellis
for CASSANDRA-2116
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index bf6c87f..d1160d2 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -142,7 +141,7 @@ public abstract class AbstractCompactionStrategy
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
for (SSTableReader sstable : sstables)
@@ -150,7 +149,7 @@ public abstract class AbstractCompactionStrategy
return scanners;
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact) throws IOException
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
{
return getScanners(toCompact, null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 49c98d2..59becfe 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -40,7 +40,7 @@ public abstract class AbstractCompactionTask
this.compactionType = OperationType.COMPACTION;
}
- public abstract int execute(CompactionExecutorStatsCollector collector) throws IOException;
+ public abstract int execute(CompactionExecutorStatsCollector collector);
public ColumnFamilyStore getColumnFamilyStore()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 06ca1a2..b9bdfa7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -25,10 +25,16 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -47,12 +53,6 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterators;
-import com.google.common.primitives.Longs;
/**
* A singleton which manages a private executor of ongoing compactions. A readwrite lock
@@ -119,9 +119,9 @@ public class CompactionManager implements CompactionManagerMBean
new Object[] {cfs.table.name,
cfs.columnFamily,
cfs.getCompactionStrategy().getClass().getSimpleName()});
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- protected void runMayThrow() throws IOException
+ public void run()
{
compactionLock.readLock().lock();
try
@@ -228,7 +228,7 @@ public class CompactionManager implements CompactionManagerMBean
{
performAllSSTableOperation(cfStore, new AllSSTablesOperation()
{
- public void perform(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
+ public void perform(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
assert !cfs.isIndex();
for (final SSTableReader sstable : sstables)
@@ -623,11 +623,11 @@ public class CompactionManager implements CompactionManagerMBean
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
- catch (Exception e)
+ catch (Throwable e)
{
if (writer != null)
writer.abort();
- throw FBUtilities.unchecked(e);
+ throw Throwables.propagate(e);
}
finally
{
@@ -655,8 +655,11 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
- throws IOException
+ public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
+ File compactionFileLocation,
+ int expectedBloomFilterSize,
+ SSTableWriter writer,
+ Collection<SSTableReader> sstables)
{
if (writer == null)
{
@@ -777,9 +780,9 @@ public class CompactionManager implements CompactionManagerMBean
public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
{
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- public void runMayThrow() throws IOException
+ public void run()
{
if (!AutoSavingCache.flushInProgress.compareAndSet(false, true))
{
@@ -809,9 +812,9 @@ public class CompactionManager implements CompactionManagerMBean
public Future<?> submitTruncate(final ColumnFamilyStore main, final long truncatedAt)
{
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- public void runMayThrow() throws InterruptedException, IOException
+ public void run()
{
compactionLock.writeLock().lock();
@@ -841,7 +844,7 @@ public class CompactionManager implements CompactionManagerMBean
private static class ValidationCompactionIterable extends CompactionIterable
{
- public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+ public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range)
{
super(OperationType.VALIDATION,
cfs.getCompactionStrategy().getScanners(sstables, range),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 468c9a9..7382fa9 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,10 +22,11 @@ import java.io.IOException;
import java.util.*;
import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -60,7 +61,7 @@ public class CompactionTask extends AbstractCompactionTask
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
- public int execute(CompactionExecutorStatsCollector collector) throws IOException
+ public int execute(CompactionExecutorStatsCollector collector)
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
@@ -180,15 +181,23 @@ public class CompactionTask extends AbstractCompactionTask
}
}
}
- catch (Exception e)
+ catch (Throwable t)
{
for (SSTableWriter writer : writers)
writer.abort();
- throw FBUtilities.unchecked(e);
+ throw Throwables.propagate(t);
}
finally
{
- iter.close();
+ try
+ {
+ iter.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
if (collector != null)
collector.finishCompaction(ci);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 122326e..2c1f02a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.compaction;
import java.io.DataOutput;
-import java.io.IOError;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Iterator;
@@ -26,18 +25,18 @@ import java.util.List;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterators;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.utils.StreamingHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.StreamingHistogram;
/**
* LazilyCompactedRow only computes the row bloom filter and column index in memory
@@ -88,8 +87,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
catch (IOException e)
{
- // Since we don't write on disk this time, we should get this
- throw new AssertionError();
+ throw new RuntimeException(e);
}
// reach into the reducer used during iteration to get column count, size, max column timestamp
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
@@ -149,7 +147,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new AssertionError(e);
}
Iterator<OnDiskAtom> iter = iterator();
@@ -205,7 +203,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new RuntimeException(e);
}
}
closed = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9ac4fce..c352832 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -178,7 +178,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return Sets.difference(L0, sstablesToIgnore).size() + manifest.getLevelCount() > 20;
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
for (SSTableReader sstable : sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index ef290f9..ebc91d7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -39,7 +39,7 @@ public class LeveledCompactionTask extends CompactionTask
}
@Override
- public int execute(CompactionManager.CompactionExecutorStatsCollector collector) throws IOException
+ public int execute(CompactionManager.CompactionExecutorStatsCollector collector)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 7ff4350..9e1e8e4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.compaction;
import java.io.File;
-import java.io.IOError;
import java.io.IOException;
import java.util.*;
@@ -34,6 +33,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -536,19 +536,23 @@ public class LeveledManifest
g.writeEndArray(); // for field generations
g.writeEndObject(); // write global object
g.close();
-
- if (oldFile.exists() && manifestFile.exists())
- FileUtils.deleteWithConfirm(oldFile);
- if (manifestFile.exists())
- FileUtils.renameWithConfirm(manifestFile, oldFile);
- assert tmpFile.exists();
- FileUtils.renameWithConfirm(tmpFile, manifestFile);
- logger.debug("Saved manifest {}", manifestFile);
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSWriteError(e, tmpFile);
}
+
+ if (oldFile.exists() && manifestFile.exists())
+ FileUtils.deleteWithConfirm(oldFile);
+
+ if (manifestFile.exists())
+ FileUtils.renameWithConfirm(manifestFile, oldFile);
+
+ assert tmpFile.exists();
+
+ FileUtils.renameWithConfirm(tmpFile, manifestFile);
+
+ logger.debug("Saved manifest {}", manifestFile);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index b6f604e..47dc88a 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -55,7 +55,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private final int maxInMemorySize;
- public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) throws IOException
+ public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
{
this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 7d4ae4b..292989a 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -18,16 +18,15 @@
package org.apache.cassandra.db.compaction;
import java.io.DataOutput;
-import java.io.IOError;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.List;
-import org.apache.cassandra.io.sstable.ColumnStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.HeapAllocator;
@@ -108,7 +107,7 @@ public class PrecompactedRow extends AbstractCompactedRow
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new RuntimeException(e);
}
if (cf == null)
@@ -174,7 +173,7 @@ public class PrecompactedRow extends AbstractCompactedRow
* We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
* memory and don't make sense for those other implementations.
*/
- public ColumnFamily getFullColumnFamily() throws IOException
+ public ColumnFamily getFullColumnFamily()
{
return compactedCf;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 4841a03..afff988 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.io.*;
import java.util.*;
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.*;
@@ -255,11 +257,11 @@ public class Scrubber implements Closeable
if (writer.getFilePointer() > 0)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
- catch (Exception e)
+ catch (Throwable t)
{
if (writer != null)
writer.abort();
- throw FBUtilities.unchecked(e);
+ throw Throwables.propagate(t);
}
if (!outOfOrderRows.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index be95b19..2d8286e 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.index;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.DecoratedKey;
@@ -38,7 +37,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+ public abstract void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
/**
* insert a column to the index
@@ -47,7 +46,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+ public abstract void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
/**
* update a column from the index
@@ -56,7 +55,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+ public abstract void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
public String getNameForSystemTable(ByteBuffer column)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index 63c2657..f2a0f60 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.index;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.List;
@@ -42,13 +41,11 @@ public abstract class PerRowSecondaryIndex extends SecondaryIndex
* @param cf the current rows data
* @param mutatedIndexedColumns the set of columns that were changed or added
* @param oldIndexedColumns the columns which were deleted
- * @throws IOException
*/
public abstract void applyIndexUpdates(ByteBuffer rowKey,
ColumnFamily cf,
SortedSet<ByteBuffer> mutatedIndexedColumns,
- ColumnFamily oldIndexedColumns) throws IOException;
-
+ ColumnFamily oldIndexedColumns);
/**
* cleans up deleted columns from cassandra cleanup compaction
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index c0f2543..9141cf8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -17,11 +17,15 @@
*/
package org.apache.cassandra.db.index;
-import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.util.*;
import java.util.concurrent.*;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -32,9 +36,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Abstract base class for different types of secondary indexes.
@@ -116,9 +117,8 @@ public abstract class SecondaryIndex
/**
* Forces this indexes in memory data to disk
- * @throws IOException
*/
- public abstract void forceBlockingFlush() throws IOException;
+ public abstract void forceBlockingFlush();
/**
* Get current amount of memory this index is consuming (in bytes)
@@ -136,7 +136,7 @@ public abstract class SecondaryIndex
* Delete all files and references to this index
* @param columnName the indexed column to remove
*/
- public abstract void removeIndex(ByteBuffer columnName) throws IOException;
+ public abstract void removeIndex(ByteBuffer columnName);
/**
* Remove the index and unregisters this index's mbean if one exists
@@ -154,7 +154,7 @@ public abstract class SecondaryIndex
* Builds the index using the data in the underlying CFS
* Blocks till it's complete
*/
- protected void buildIndexBlocking() throws IOException
+ protected void buildIndexBlocking()
{
logger.info(String.format("Submitting index build of %s for data in %s",
getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
@@ -191,7 +191,11 @@ public abstract class SecondaryIndex
}
catch (ExecutionException e)
{
- throw new IOException(e);
+ throw new RuntimeException(e);
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
}
finally
{
@@ -232,6 +236,7 @@ public abstract class SecondaryIndex
try
{
baseCfs.forceBlockingFlush();
+ buildIndexBlocking();
}
catch (ExecutionException e)
{
@@ -241,15 +246,6 @@ public abstract class SecondaryIndex
{
throw new AssertionError(e);
}
-
- try
- {
- buildIndexBlocking();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
}
};
FutureTask<?> f = new FutureTask<Object>(runnable, null);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index ce1469e..1189d72 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.index;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -29,6 +28,10 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.*;
@@ -80,9 +83,8 @@ public class SecondaryIndexManager
/**
* Drops and adds new indexes associated with the underlying CF
- * @throws IOException
*/
- public void reload() throws IOException
+ public void reload()
{
// figure out what needs to be added and dropped.
// future: if/when we have modifiable settings for secondary indexes,
@@ -115,9 +117,8 @@ public class SecondaryIndexManager
*
* @param sstables the data to build from
* @param columns the list of columns to index, ordered by comparator
- * @throws IOException
*/
- public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns) throws IOException
+ public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns)
{
if (columns.isEmpty())
return;
@@ -130,7 +131,6 @@ public class SecondaryIndexManager
try
{
future.get();
- flushIndexesBlocking();
}
catch (InterruptedException e)
{
@@ -141,6 +141,8 @@ public class SecondaryIndexManager
throw new RuntimeException(e);
}
+ flushIndexesBlocking();
+
logger.info("Index build of " + baseCfs.metadata.comparator.getString(columns) + " complete");
}
@@ -175,9 +177,8 @@ public class SecondaryIndexManager
/**
* Removes a existing index
* @param column the indexed column to remove
- * @throws IOException
*/
- public void removeIndexedColumn(ByteBuffer column) throws IOException
+ public void removeIndexedColumn(ByteBuffer column)
{
SecondaryIndex index = indexesByColumn.remove(column);
@@ -279,10 +280,8 @@ public class SecondaryIndexManager
/**
* Flush all indexes to disk
- * @throws ExecutionException
- * @throws InterruptedException
*/
- public void flushIndexesBlocking() throws IOException
+ public void flushIndexesBlocking()
{
for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
entry.getValue().forceBlockingFlush();
@@ -415,12 +414,11 @@ public class SecondaryIndexManager
* @param cf the current rows data
* @param mutatedIndexedColumns the set of columns that were changed or added
* @param oldIndexedColumns the columns what were deleted
- * @throws IOException
*/
public void applyIndexUpdates(ByteBuffer rowKey,
ColumnFamily cf,
SortedSet<ByteBuffer> mutatedIndexedColumns,
- ColumnFamily oldIndexedColumns) throws IOException
+ ColumnFamily oldIndexedColumns)
{
// Identify the columns with PerRowSecondaryIndexes
@@ -504,7 +502,7 @@ public class SecondaryIndexManager
* @param key the row key
* @param indexedColumnsInRow all column names in row
*/
- public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRow) throws IOException
+ public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRow)
{
// Identify the columns with isRowLevelIndex == true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index b5a46a8..7147bed 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -17,11 +17,13 @@
*/
package org.apache.cassandra.db.index.keys;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
@@ -34,8 +36,6 @@ import org.apache.cassandra.db.marshal.LocalByPartionerType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Implements a secondary index for a column family using a second column family
@@ -125,12 +125,12 @@ public class KeysIndex extends PerColumnSecondaryIndex
insertColumn(valueKey, rowKey, col);
}
- public void removeIndex(ByteBuffer columnName) throws IOException
+ public void removeIndex(ByteBuffer columnName)
{
indexCfs.invalidate();
}
- public void forceBlockingFlush() throws IOException
+ public void forceBlockingFlush()
{
try
{
@@ -138,11 +138,11 @@ public class KeysIndex extends PerColumnSecondaryIndex
}
catch (ExecutionException e)
{
- throw new IOException(e);
+ throw new RuntimeException(e);
}
catch (InterruptedException e)
{
- throw new IOException(e);
+ throw new AssertionError(e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index dfe79e2..1c8283a 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.marshal;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
@@ -277,7 +278,14 @@ public class CompositeType extends AbstractCompositeType
DataOutputBuffer out = new DataOutputBuffer(serializedSize);
for (int i = 0; i < components.size(); i++)
{
- ByteBufferUtil.writeWithShortLength(components.get(i), out);
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(components.get(i), out);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
out.write(endOfComponents[i]);
}
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e21a69d..952328b 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.BoundedStatsDeque;
import org.apache.cassandra.utils.FBUtilities;
@@ -110,16 +111,17 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
*/
public void dumpInterArrivalTimes()
{
+ File file = FileUtils.createTempFile("failuredetector-", ".dat");
+
OutputStream os = null;
try
{
- File file = File.createTempFile("failuredetector-", ".dat");
os = new BufferedOutputStream(new FileOutputStream(file, true));
os.write(toString().getBytes());
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSWriteError(e, file);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 9af28de..a45a574 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.hadoop;
-
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@@ -27,6 +26,9 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -46,9 +48,6 @@ import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
@@ -128,7 +127,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
}
}
- private void prepareWriter() throws IOException
+ private void prepareWriter()
{
if (writer == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/FSError.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/FSError.java b/src/java/org/apache/cassandra/io/FSError.java
new file mode 100644
index 0000000..e09bac7
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/FSError.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOError;
+
+public abstract class FSError extends IOError
+{
+ public final File path;
+
+ public FSError(Throwable cause, File path)
+ {
+ super(cause);
+ this.path = path;
+ }
+
+ /**
+ * Unwraps the Throwable cause chain looking for an FSError instance
+ * @param top the top-level Throwable to unwrap
+ * @return FSError if found any, null otherwise
+ */
+ public static FSError findNested(Throwable top)
+ {
+ for (Throwable t = top; t != null; t = t.getCause())
+ {
+ if (t instanceof FSError)
+ return (FSError) t;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/FSReadError.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/FSReadError.java b/src/java/org/apache/cassandra/io/FSReadError.java
new file mode 100644
index 0000000..c180eb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/FSReadError.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.File;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+public class FSReadError extends FSError
+{
+ public FSReadError(Throwable cause, File path)
+ {
+ super(cause, path);
+ }
+
+ public FSReadError(Throwable cause, String path)
+ {
+ this(cause, new File(path));
+ }
+
+ public FSReadError(Throwable cause, Descriptor descriptor)
+ {
+ this(cause, descriptor.baseFilename());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/FSWriteError.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/FSWriteError.java b/src/java/org/apache/cassandra/io/FSWriteError.java
new file mode 100644
index 0000000..9346f62
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/FSWriteError.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.File;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+public class FSWriteError extends FSError
+{
+ public FSWriteError(Throwable cause, File path)
+ {
+ super(cause, path);
+ }
+
+ public FSWriteError(Throwable cause, String path)
+ {
+ this(cause, new File(path));
+ }
+
+ public FSWriteError(Throwable cause, Descriptor descriptor)
+ {
+ this(cause, descriptor.baseFilename());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 5483e46..2bc0c44 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -17,32 +17,43 @@
*/
package org.apache.cassandra.io.compress;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
// TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer"
public class CompressedRandomAccessReader extends RandomAccessReader
{
private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class);
- public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata) throws IOException
+ public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata)
{
return open(dataFilePath, metadata, false);
}
- public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
+ public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache)
{
- return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache);
+ try
+ {
+ return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
private final CompressionMetadata metadata;
@@ -58,7 +69,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
private final FileInputStream source;
private final FileChannel channel;
- public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
+ public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws FileNotFoundException
{
super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
this.metadata = metadata;
@@ -66,14 +77,34 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// can't use super.read(...) methods
// that is why we are allocating special InputStream to read data from disk
// from already open file descriptor
- source = new FileInputStream(getFD());
+ try
+ {
+ source = new FileInputStream(getFD());
+ }
+ catch (IOException e)
+ {
+ // fd == null, Not Supposed To Happen
+ throw new RuntimeException(e);
+ }
+
channel = source.getChannel(); // for position manipulation
}
@Override
- protected void reBuffer() throws IOException
+ protected void reBuffer()
{
- decompressChunk(metadata.chunkFor(current));
+ try
+ {
+ decompressChunk(metadata.chunkFor(current));
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
}
private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException
@@ -85,16 +116,23 @@ public class CompressedRandomAccessReader extends RandomAccessReader
compressed = new byte[chunk.length];
if (source.read(compressed, 0, chunk.length) != chunk.length)
- throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset));
+ throw new CorruptBlockException(getPath(), chunk);
- validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
+ try
+ {
+ validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
if (metadata.parameters.crcChance > FBUtilities.threadLocalRandom().nextDouble())
{
checksum.update(buffer, 0, validBufferBytes);
if (checksum(chunk) != (int) checksum.getValue())
- throw new CorruptedBlockException(getPath(), chunk);
+ throw new CorruptBlockException(getPath(), chunk);
// reset checksum object back to the original (blank) state
checksum.reset();
@@ -109,16 +147,13 @@ public class CompressedRandomAccessReader extends RandomAccessReader
assert channel.position() == chunk.offset + chunk.length;
if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length)
- throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.",
- getPath(),
- chunk.offset,
- chunk.length));
+ throw new CorruptBlockException(getPath(), chunk);
return Ints.fromByteArray(checksumBytes);
}
@Override
- public long length() throws IOException
+ public long length()
{
return metadata.dataLength;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index fdcf1b8..6d78287 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -28,7 +28,11 @@ import org.apache.cassandra.io.util.SequentialWriter;
public class CompressedSequentialWriter extends SequentialWriter
{
- public static SequentialWriter open(String dataFilePath, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
+ public static SequentialWriter open(String dataFilePath,
+ String indexFilePath,
+ boolean skipIOCache,
+ CompressionParameters parameters,
+ Collector sstableMetadataCollector)
{
return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
}
@@ -53,7 +57,11 @@ public class CompressedSequentialWriter extends SequentialWriter
private final Collector sstableMetadataCollector;
- public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
+ public CompressedSequentialWriter(File file,
+ String indexFilePath,
+ boolean skipIOCache,
+ CompressionParameters parameters,
+ Collector sstableMetadataCollector)
{
super(file, parameters.chunkLength(), skipIOCache);
this.compressor = parameters.sstableCompressor;
@@ -62,8 +70,9 @@ public class CompressedSequentialWriter extends SequentialWriter
compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
/* Index File (-CompressionInfo.db component) and it's header */
- metadataWriter = new CompressionMetadata.Writer(indexFilePath);
+ metadataWriter = CompressionMetadata.Writer.open(indexFilePath);
metadataWriter.writeHeader(parameters);
+
this.sstableMetadataCollector = sstableMetadataCollector;
}
@@ -161,7 +170,7 @@ public class CompressedSequentialWriter extends SequentialWriter
checksum.update(buffer, 0, validBytes);
if (out.readInt() != (int) checksum.getValue())
- throw new CorruptedBlockException(getPath(), chunkOffset, chunkSize);
+ throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
checksum.reset();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 4c7083f..a68de8a 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -24,8 +24,11 @@ import com.google.common.primitives.Longs;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.BigLongArray;
@@ -56,48 +59,57 @@ public class CompressionMetadata
public static CompressionMetadata create(String dataFilePath)
{
Descriptor desc = Descriptor.fromFilename(dataFilePath);
-
- try
- {
- return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
}
// This is package protected because of the tests.
- CompressionMetadata(String indexFilePath, long compressedLength) throws IOException
+ CompressionMetadata(String indexFilePath, long compressedLength)
{
this.indexFilePath = indexFilePath;
- DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath));
-
- String compressorName = stream.readUTF();
- int optionCount = stream.readInt();
- Map<String, String> options = new HashMap<String, String>();
- for (int i = 0; i < optionCount; ++i)
- {
- String key = stream.readUTF();
- String value = stream.readUTF();
- options.put(key, value);
- }
- int chunkLength = stream.readInt();
+ DataInputStream stream;
try
{
- parameters = new CompressionParameters(compressorName, chunkLength, options);
+ stream = new DataInputStream(new FileInputStream(indexFilePath));
}
- catch (ConfigurationException e)
+ catch (FileNotFoundException e)
{
- throw new RuntimeException("Cannot create CompressionParameters for stored parameters", e);
+ throw new RuntimeException(e);
}
- dataLength = stream.readLong();
- compressedFileLength = compressedLength;
- chunkOffsets = readChunkOffsets(stream);
+ try
+ {
+ String compressorName = stream.readUTF();
+ int optionCount = stream.readInt();
+ Map<String, String> options = new HashMap<String, String>();
+ for (int i = 0; i < optionCount; ++i)
+ {
+ String key = stream.readUTF();
+ String value = stream.readUTF();
+ options.put(key, value);
+ }
+ int chunkLength = stream.readInt();
+ try
+ {
+ parameters = new CompressionParameters(compressorName, chunkLength, options);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException("Cannot create CompressionParameters for stored parameters", e);
+ }
- FileUtils.closeQuietly(stream);
+ dataLength = stream.readLong();
+ compressedFileLength = compressedLength;
+ chunkOffsets = readChunkOffsets(stream);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptSSTableException(e, indexFilePath);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(stream);
+ }
}
public ICompressor compressor()
@@ -116,30 +128,34 @@ public class CompressionMetadata
* @param input Source of the data.
*
* @return collection of the chunk offsets.
- *
- * @throws java.io.IOException on any I/O error (except EOF).
*/
- private BigLongArray readChunkOffsets(DataInput input) throws IOException
+ private BigLongArray readChunkOffsets(DataInput input)
{
- int chunkCount = input.readInt();
- BigLongArray offsets = new BigLongArray(chunkCount);
-
- for (int i = 0; i < chunkCount; i++)
+ try
{
- try
- {
- offsets.set(i, input.readLong());
- }
- catch (EOFException e)
+ int chunkCount = input.readInt();
+ BigLongArray offsets = new BigLongArray(chunkCount);
+
+ for (int i = 0; i < chunkCount; i++)
{
- throw new EOFException(String.format("Corrupted Index File %s: read %d but expected %d chunks.",
- indexFilePath,
- i,
- chunkCount));
+ try
+ {
+ offsets.set(i, input.readLong());
+ }
+ catch (EOFException e)
+ {
+ String msg = String.format("Corrupted Index File %s: read %d but expected %d chunks.",
+ indexFilePath, i, chunkCount);
+ throw new CorruptSSTableException(new IOException(msg, e), indexFilePath);
+ }
}
- }
- return offsets;
+ return offsets;
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, indexFilePath);
+ }
}
/**
@@ -147,15 +163,14 @@ public class CompressionMetadata
*
* @param position Position in the file.
* @return pair of chunk offset and length.
- * @throws java.io.IOException on any I/O error.
*/
- public Chunk chunkFor(long position) throws IOException
+ public Chunk chunkFor(long position)
{
// position of the chunk
int idx = (int) (position / parameters.chunkLength());
if (idx >= chunkOffsets.size)
- throw new EOFException();
+ throw new CorruptSSTableException(new EOFException(), indexFilePath);
long chunkOffset = chunkOffsets.get(idx);
long nextChunkOffset = (idx + 1 == chunkOffsets.size)
@@ -199,46 +214,83 @@ public class CompressionMetadata
{
// place for uncompressed data length in the index file
private long dataLengthOffset = -1;
+ // path to the file
+ private final String filePath;
- public Writer(String path) throws IOException
+ private Writer(String path) throws FileNotFoundException
{
super(path, "rw");
+ filePath = path;
}
- public void writeHeader(CompressionParameters parameters) throws IOException
+ public static Writer open(String path)
{
- // algorithm
- writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
- writeInt(parameters.otherOptions.size());
- for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
+ try
{
- writeUTF(entry.getKey());
- writeUTF(entry.getValue());
+ return new Writer(path);
}
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
- // store the length of the chunk
- writeInt(parameters.chunkLength());
- // store position and reserve a place for uncompressed data length and chunks count
- dataLengthOffset = getFilePointer();
- writeLong(-1);
- writeInt(-1);
+ public void writeHeader(CompressionParameters parameters)
+ {
+ try
+ {
+ writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+ writeInt(parameters.otherOptions.size());
+ for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
+ {
+ writeUTF(entry.getKey());
+ writeUTF(entry.getValue());
+ }
+
+ // store the length of the chunk
+ writeInt(parameters.chunkLength());
+ // store position and reserve a place for uncompressed data length and chunks count
+ dataLengthOffset = getFilePointer();
+ writeLong(-1);
+ writeInt(-1);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, filePath);
+ }
}
- public void finalizeHeader(long dataLength, int chunks) throws IOException
+ public void finalizeHeader(long dataLength, int chunks)
{
assert dataLengthOffset != -1 : "writeHeader wasn't called";
- long currentPosition = getFilePointer();
+ long currentPosition;
+ try
+ {
+ currentPosition = getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filePath);
+ }
+
+ try
+ {
- // seek back to the data length position
- seek(dataLengthOffset);
+ // seek back to the data length position
+ seek(dataLengthOffset);
- // write uncompressed data length and chunks count
- writeLong(dataLength);
- writeInt(chunks);
+ // write uncompressed data length and chunks count
+ writeLong(dataLength);
+ writeInt(chunks);
- // seek forward to the previous position
- seek(currentPosition);
+ // seek forward to the previous position
+ seek(currentPosition);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, filePath);
+ }
}
/**
@@ -247,30 +299,35 @@ public class CompressionMetadata
* @param chunkIndex Index of the chunk.
*
* @return offset of the chunk in the compressed file.
- *
- * @throws IOException any I/O error.
*/
- public long chunkOffsetBy(int chunkIndex) throws IOException
+ public long chunkOffsetBy(int chunkIndex)
{
if (dataLengthOffset == -1)
throw new IllegalStateException("writeHeader wasn't called");
- long position = getFilePointer();
-
- // seek to the position of the given chunk
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
-
try
{
- return readLong();
+ long position = getFilePointer();
+
+ // seek to the position of the given chunk
+ seek(dataLengthOffset
+ + 8 // size reserved for uncompressed data length
+ + 4 // size reserved for chunk count
+ + (chunkIndex * 8L));
+
+ try
+ {
+ return readLong();
+ }
+ finally
+ {
+ // back to the original position
+ seek(position);
+ }
}
- finally
+ catch (IOException e)
{
- // back to the original position
- seek(position);
+ throw new FSReadError(e, filePath);
}
}
@@ -278,13 +335,20 @@ public class CompressionMetadata
* Reset the writer so that the next chunk offset written will be the
* one of {@code chunkIndex}.
*/
- public void resetAndTruncate(int chunkIndex) throws IOException
+ public void resetAndTruncate(int chunkIndex)
{
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
- getChannel().truncate(getFilePointer());
+ try
+ {
+ seek(dataLengthOffset
+ + 8 // size reserved for uncompressed data length
+ + 4 // size reserved for chunk count
+ + (chunkIndex * 8L));
+ getChannel().truncate(getFilePointer());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, filePath);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java b/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java
new file mode 100644
index 0000000..60b4d1f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+
+public class CorruptBlockException extends IOException
+{
+ public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk)
+ {
+ this(filePath, chunk.offset, chunk.length);
+ }
+
+ public CorruptBlockException(String filePath, long offset, int length)
+ {
+ super(String.format("(%s): corruption detected, chunk at %d of length %d.", filePath, offset, length));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/compress/CorruptedBlockException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CorruptedBlockException.java b/src/java/org/apache/cassandra/io/compress/CorruptedBlockException.java
deleted file mode 100644
index d694de1..0000000
--- a/src/java/org/apache/cassandra/io/compress/CorruptedBlockException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.compress;
-
-import java.io.IOException;
-
-public class CorruptedBlockException extends IOException
-{
- public CorruptedBlockException(String filePath, CompressionMetadata.Chunk chunk)
- {
- this(filePath, chunk.offset, chunk.length);
- }
-
- public CorruptedBlockException(String filePath, long offset, int length)
- {
- super(String.format("(%s): corruption detected, chunk at %d of length %d.", filePath, offset, length));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 96c888d..d1727f7 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -48,7 +48,7 @@ public abstract class AbstractSSTableSimpleWriter
DatabaseDescriptor.setPartitioner(partitioner);
}
- protected SSTableWriter getWriter() throws IOException
+ protected SSTableWriter getWriter()
{
return new SSTableWriter(
makeFilename(directory, metadata.ksName, metadata.cfName),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java b/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
new file mode 100644
index 0000000..61cead4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+
+public class CorruptSSTableException extends RuntimeException
+{
+ public final File path;
+
+ public CorruptSSTableException(Exception cause, File path)
+ {
+ super(cause);
+ this.path = path;
+ }
+
+ public CorruptSSTableException(Exception cause, String path)
+ {
+ this(cause, new File(path));
+ }
+
+ public CorruptSSTableException(Exception cause, Descriptor descriptor)
+ {
+ this(cause, descriptor.baseFilename());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 4f776d6..d03c2e8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -204,7 +204,7 @@ public class Descriptor
return filenameFor(component.name());
}
- private String baseFilename()
+ public String baseFilename()
{
StringBuilder buff = new StringBuilder();
buff.append(directory).append(File.separatorChar);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index e0cb0f3..e581b22 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.IOError;
import java.io.IOException;
import com.google.common.collect.AbstractIterator;
@@ -38,14 +37,8 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
public KeyIterator(Descriptor desc)
{
this.desc = desc;
- try
- {
- in = RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), true);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ in = RandomAccessReader.open(path, true);
}
protected DecoratedKey computeNext()
@@ -60,11 +53,11 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new RuntimeException(e);
}
}
- public void close() throws IOException
+ public void close()
{
in.close();
}
@@ -76,13 +69,6 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
public long getTotalBytes()
{
- try
- {
- return in.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ return in.length();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index db7a9f8..8b4bafb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -129,7 +129,7 @@ public abstract class SSTable
*
* @return true if the file was deleted
*/
- public static boolean delete(Descriptor desc, Set<Component> components) throws IOException
+ public static boolean delete(Descriptor desc, Set<Component> components)
{
// remove the DATA component first if it exists
if (components.contains(Component.DATA))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index 36886db..788f4ca 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -43,15 +42,7 @@ public class SSTableBoundedScanner extends SSTableScanner
if (rangeIterator.hasNext())
{
currentRange = rangeIterator.next();
- try
- {
- dfile.seek(currentRange.left);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new RuntimeException(e);
- }
+ dfile.seek(currentRange.left);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index e7ebb27..2335b7d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -31,9 +30,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.WrappedRunnable;
-public class SSTableDeletingTask extends WrappedRunnable
+public class SSTableDeletingTask implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class);
@@ -65,7 +63,7 @@ public class SSTableDeletingTask extends WrappedRunnable
StorageService.tasks.submit(this);
}
- protected void runMayThrow() throws IOException
+ public void run()
{
// If we can't successfully delete the DATA component, set the task to be retried later: see above
File datafile = new File(desc.filenameFor(Component.DATA));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 67f95d7..337d6b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -53,6 +53,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
private final int expireBefore;
private final boolean validateColumns;
+ private final String filename;
/**
* Used to iterate through the columns of a row.
@@ -64,7 +65,6 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
* @throws IOException
*/
public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize)
- throws IOException
{
this(sstable, file, key, dataStart, dataSize, false);
}
@@ -80,24 +80,31 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
* @throws IOException
*/
public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
- throws IOException
{
- this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
+ this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
}
// Must only be used against current file format
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
- throws IOException
+ public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
{
- this(metadata, file, key, dataStart, dataSize, false, null, flag);
+ this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
}
// sstable may be null *if* checkData is false
// If it is null, we assume the data is in the current file format
- private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, IColumnSerializer.Flag flag)
- throws IOException
+ private SSTableIdentityIterator(CFMetaData metadata,
+ DataInput input,
+ String filename,
+ DecoratedKey key,
+ long dataStart,
+ long dataSize,
+ boolean checkData,
+ SSTableReader sstable,
+ IColumnSerializer.Flag flag)
{
+ assert !checkData || (sstable != null);
this.input = input;
+ this.filename = filename;
this.inputWithTracker = new BytesReadTracker(input);
this.key = key;
this.dataStart = dataStart;
@@ -128,10 +135,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
throw (EOFException) e;
logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
- // deFreeze should have left the file position ready to deserialize index
}
+
try
{
+ // deFreeze should have left the file position ready to deserialize index
IndexHelper.deserializeIndex(file);
}
catch (Exception e)
@@ -152,14 +160,13 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
atomSerializer = columnFamily.getOnDiskSerializer();
columnCount = inputWithTracker.readInt();
-
columnPosition = dataStart + inputWithTracker.getBytesRead();
}
catch (IOException e)
{
if (sstable != null)
sstable.markSuspect();
- throw new IOError(e);
+ throw new CorruptSSTableException(e, filename);
}
}
@@ -189,11 +196,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new CorruptSSTableException(e, filename);
}
- catch (MarshalException e)
+ catch (MarshalException me)
{
- throw new IOError(new IOException("Error validating row " + key, e));
+ throw new CorruptSSTableException(me, filename);
}
}
@@ -202,7 +209,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
throw new UnsupportedOperationException();
}
- public void close() throws IOException
+ public void close()
{
// creator is responsible for closing file when finished
}
@@ -246,7 +253,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
}
catch (MarshalException e)
{
- throw new IOException("Error validating row " + key, e);
+ throw new RuntimeException("Error validating row " + key, e);
}
}
return cf;
@@ -268,14 +275,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
throw new UnsupportedOperationException();
RandomAccessReader file = (RandomAccessReader) input;
- try
- {
- file.seek(columnPosition);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ file.seek(columnPosition);
inputWithTracker.reset(headerSize());
}