You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/27 22:47:19 UTC
[2/5] git commit: clean up ioexceptions patch by Aleksey Yeschenko
and jbellis for CASSANDRA-2116
clean up ioexceptions
patch by Aleksey Yeschenko and jbellis for CASSANDRA-2116
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/debb15ed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/debb15ed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/debb15ed
Branch: refs/heads/trunk
Commit: debb15ed14aeff1f9bd1fc859cff10ee7248cf8c
Parents: acedc34
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jul 27 15:45:27 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jul 27 15:45:27 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 25 +-
.../apache/cassandra/cache/SerializingCache.java | 10 +-
.../cassandra/cache/SerializingCacheProvider.java | 31 +-
.../org/apache/cassandra/config/CFMetaData.java | 9 +-
.../cassandra/config/DatabaseDescriptor.java | 33 ++-
.../org/apache/cassandra/config/KSMetaData.java | 13 +-
.../apache/cassandra/db/CollationController.java | 13 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 77 +----
.../org/apache/cassandra/db/ColumnSerializer.java | 12 +-
src/java/org/apache/cassandra/db/DataTracker.java | 9 +-
src/java/org/apache/cassandra/db/DefsTable.java | 13 +-
src/java/org/apache/cassandra/db/Directories.java | 71 ++---
src/java/org/apache/cassandra/db/Memtable.java | 15 +-
.../apache/cassandra/db/ReadRepairVerbHandler.java | 18 +-
src/java/org/apache/cassandra/db/RowMutation.java | 5 +-
.../apache/cassandra/db/SliceFromReadCommand.java | 7 +-
src/java/org/apache/cassandra/db/SuperColumn.java | 19 +-
src/java/org/apache/cassandra/db/SystemTable.java | 41 +--
src/java/org/apache/cassandra/db/Table.java | 54 +---
.../apache/cassandra/db/TruncateVerbHandler.java | 41 +--
.../db/columniterator/IndexedSliceReader.java | 28 +-
.../db/columniterator/SSTableNamesIterator.java | 8 +-
.../db/columniterator/SimpleSliceReader.java | 6 +-
.../commitlog/BatchCommitLogExecutorService.java | 12 +-
.../apache/cassandra/db/commitlog/CommitLog.java | 61 ++--
.../cassandra/db/commitlog/CommitLogAllocator.java | 13 +-
.../cassandra/db/commitlog/CommitLogReplayer.java | 12 +-
.../cassandra/db/commitlog/CommitLogSegment.java | 34 +-
.../db/compaction/AbstractCompactionStrategy.java | 5 +-
.../db/compaction/AbstractCompactionTask.java | 2 +-
.../cassandra/db/compaction/CompactionManager.java | 41 ++-
.../cassandra/db/compaction/CompactionTask.java | 19 +-
.../db/compaction/LazilyCompactedRow.java | 16 +-
.../db/compaction/LeveledCompactionStrategy.java | 2 +-
.../db/compaction/LeveledCompactionTask.java | 2 +-
.../cassandra/db/compaction/LeveledManifest.java | 24 +-
.../db/compaction/ParallelCompactionIterable.java | 2 +-
.../cassandra/db/compaction/PrecompactedRow.java | 7 +-
.../apache/cassandra/db/compaction/Scrubber.java | 6 +-
.../db/index/PerColumnSecondaryIndex.java | 7 +-
.../cassandra/db/index/PerRowSecondaryIndex.java | 5 +-
.../apache/cassandra/db/index/SecondaryIndex.java | 32 +-
.../cassandra/db/index/SecondaryIndexManager.java | 26 +-
.../apache/cassandra/db/index/keys/KeysIndex.java | 14 +-
.../apache/cassandra/db/marshal/CompositeType.java | 10 +-
.../org/apache/cassandra/gms/FailureDetector.java | 6 +-
.../apache/cassandra/hadoop/BulkRecordWriter.java | 9 +-
src/java/org/apache/cassandra/io/FSError.java | 48 +++
src/java/org/apache/cassandra/io/FSReadError.java | 40 +++
src/java/org/apache/cassandra/io/FSWriteError.java | 40 +++
.../io/compress/CompressedRandomAccessReader.java | 73 +++-
.../io/compress/CompressedSequentialWriter.java | 17 +-
.../cassandra/io/compress/CompressionMetadata.java | 252 +++++++++-----
.../io/compress/CorruptBlockException.java | 33 ++
.../io/compress/CorruptedBlockException.java | 33 --
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +-
.../io/sstable/CorruptSSTableException.java | 41 +++
.../apache/cassandra/io/sstable/Descriptor.java | 2 +-
.../apache/cassandra/io/sstable/KeyIterator.java | 24 +-
.../org/apache/cassandra/io/sstable/SSTable.java | 2 +-
.../io/sstable/SSTableBoundedScanner.java | 11 +-
.../cassandra/io/sstable/SSTableDeletingTask.java | 6 +-
.../io/sstable/SSTableIdentityIterator.java | 48 ++--
.../apache/cassandra/io/sstable/SSTableReader.java | 38 +--
.../cassandra/io/sstable/SSTableScanner.java | 68 +---
.../io/sstable/SSTableSimpleUnsortedWriter.java | 19 +-
.../cassandra/io/sstable/SSTableSimpleWriter.java | 12 +-
.../apache/cassandra/io/sstable/SSTableWriter.java | 262 +++++++++------
.../cassandra/io/util/BufferedSegmentedFile.java | 15 +-
.../apache/cassandra/io/util/ColumnSortedMap.java | 2 +-
.../cassandra/io/util/CompressedSegmentedFile.java | 16 +-
.../org/apache/cassandra/io/util/FileUtils.java | 113 +++++--
.../cassandra/io/util/MmappedSegmentedFile.java | 37 +-
.../cassandra/io/util/RandomAccessReader.java | 143 +++++---
.../apache/cassandra/io/util/SegmentedFile.java | 4 +-
.../apache/cassandra/io/util/SequentialWriter.java | 29 ++-
.../cassandra/net/IncomingTcpConnection.java | 5 +-
.../cassandra/service/SnapshotVerbHandler.java | 30 +-
.../apache/cassandra/service/StorageService.java | 25 +-
.../cassandra/streaming/IncomingStreamReader.java | 16 +-
.../cassandra/streaming/StreamInSession.java | 6 +-
.../org/apache/cassandra/streaming/StreamOut.java | 27 +-
.../cassandra/streaming/StreamOutSession.java | 5 +-
.../streaming/StreamReplyVerbHandler.java | 63 ++--
.../cassandra/streaming/StreamingRepairTask.java | 14 +-
.../streaming/compress/CompressedInputStream.java | 2 +-
.../org/apache/cassandra/utils/ByteBufferUtil.java | 19 +-
.../org/apache/cassandra/utils/FBUtilities.java | 15 +-
.../org/apache/cassandra/utils/MergeIterator.java | 3 +-
test/unit/org/apache/cassandra/SchemaLoader.java | 25 +-
test/unit/org/apache/cassandra/Util.java | 4 +-
.../org/apache/cassandra/db/DirectoriesTest.java | 2 +-
.../cassandra/db/SecondaryIndexColumnSizeTest.java | 16 +-
.../compress/CompressedRandomAccessReaderTest.java | 22 +-
.../io/util/BufferedRandomAccessFileTest.java | 16 +-
.../cassandra/streaming/SerializationsTest.java | 9 +-
97 files changed, 1416 insertions(+), 1264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e393773..c99ca21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-dev
+ * clean up ioexceptions (CASSANDRA-2116)
* Introduce new json format with row level deletion (CASSANDRA-4054)
* remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
* improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 41b8f5d..db872a4 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.cache;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
@@ -35,12 +34,12 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.Pair;
public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
@@ -82,9 +81,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
if (savePeriodInSeconds > 0)
{
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- public void runMayThrow()
+ public void run()
{
submitWrite(keysToSave);
}
@@ -209,7 +208,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
}
- public void saveCache() throws IOException
+ public void saveCache()
{
logger.debug("Deleting old {} files.", cacheType);
deleteOldCacheFiles();
@@ -236,7 +235,16 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
writer = tempCacheFile(path);
writers.put(path, writer);
}
- cacheLoader.serialize(key, writer.stream);
+
+ try
+ {
+ cacheLoader.serialize(key, writer.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, writer.getPath());
+ }
+
keysWritten++;
}
}
@@ -262,11 +270,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
logger.info(String.format("Saved %s (%d items) in %d ms", cacheType, keys.size(), System.currentTimeMillis() - start));
}
- private SequentialWriter tempCacheFile(Pair<String, String> pathInfo) throws IOException
+ private SequentialWriter tempCacheFile(Pair<String, String> pathInfo)
{
File path = getCachePath(pathInfo.left, pathInfo.right, CURRENT_VERSION);
- File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
-
+ File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
return SequentialWriter.open(tmpFile, true);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 3571194..bf2a319 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -17,14 +17,15 @@
*/
package org.apache.cassandra.cache;
-import java.io.IOError;
import java.io.IOException;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import com.googlecode.concurrentlinkedhashmap.Weigher;
-
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.MemoryInputStream;
@@ -32,9 +33,6 @@ import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.utils.vint.EncodedDataInputStream;
import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Serializes cache values off-heap.
*/
@@ -121,7 +119,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new RuntimeException(e);
}
return freeableMemory;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index c8d11d2..1ee211d 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cache;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.IOError;
import java.io.IOException;
import org.apache.cassandra.db.ColumnFamily;
@@ -37,21 +36,15 @@ public class SerializingCacheProvider implements IRowCacheProvider
// Package protected for tests
static class RowCacheSerializer implements ISerializer<IRowCacheEntry>
{
- public void serialize(IRowCacheEntry cf, DataOutput out)
+ public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
{
- assert cf != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
- try
- {
- out.writeBoolean(cf instanceof RowCacheSentinel);
- if (cf instanceof RowCacheSentinel)
- out.writeLong(((RowCacheSentinel) cf).sentinelId);
- else
- ColumnFamily.serializer.serialize((ColumnFamily) cf, out, MessagingService.current_version);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
+ boolean isSentinel = entry instanceof RowCacheSentinel;
+ out.writeBoolean(isSentinel);
+ if (isSentinel)
+ out.writeLong(((RowCacheSentinel) entry).sentinelId);
+ else
+ ColumnFamily.serializer.serialize((ColumnFamily) entry, out, MessagingService.current_version);
}
public IRowCacheEntry deserialize(DataInput in) throws IOException
@@ -62,13 +55,13 @@ public class SerializingCacheProvider implements IRowCacheProvider
return ColumnFamily.serializer.deserialize(in, MessagingService.current_version);
}
- public long serializedSize(IRowCacheEntry cf, TypeSizes typeSizes)
+ public long serializedSize(IRowCacheEntry entry, TypeSizes typeSizes)
{
int size = typeSizes.sizeof(true);
- if (cf instanceof RowCacheSentinel)
- size += typeSizes.sizeof(((RowCacheSentinel) cf).sentinelId);
+ if (entry instanceof RowCacheSentinel)
+ size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
else
- size += ColumnFamily.serializer.serializedSize((ColumnFamily) cf, typeSizes, MessagingService.current_version);
+ size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index ccb323b..4ff7d2d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.config;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
@@ -682,12 +681,12 @@ public final class CFMetaData
}
}
- public void reload() throws IOException
+ public void reload()
{
Row cfDefRow = SystemTable.readSchemaRow(ksName, cfName);
if (cfDefRow.cf == null || cfDefRow.cf.isEmpty())
- throw new IOException(String.format("%s not found in the schema definitions table.", ksName + ":" + cfName));
+ throw new RuntimeException(String.format("%s not found in the schema definitions table.", ksName + ":" + cfName));
try
{
@@ -695,7 +694,7 @@ public final class CFMetaData
}
catch (ConfigurationException e)
{
- throw new IOException(e);
+ throw new RuntimeException(e);
}
}
@@ -1272,8 +1271,6 @@ public final class CFMetaData
* Deserialize CF metadata from low-level representation
*
* @return Thrift-based metadata deserialized from schema
- *
- * @throws IOException on any I/O related error
*/
public static CFMetaData fromSchema(UntypedResultSet.Row result)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ef2440b..186b190 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -599,33 +600,41 @@ public class DatabaseDescriptor
/**
* Creates all storage-related directories.
- * @throws IOException when a disk problem is encountered.
*/
- public static void createAllDirectories() throws IOException
+ public static void createAllDirectories()
{
- try {
+ try
+ {
if (conf.data_file_directories.length == 0)
- {
throw new ConfigurationException("At least one DataFileDirectory must be specified");
- }
- for ( String dataFileDirectory : conf.data_file_directories )
+
+ for (String dataFileDirectory : conf.data_file_directories)
+ {
FileUtils.createDirectory(dataFileDirectory);
+ }
+
if (conf.commitlog_directory == null)
- {
throw new ConfigurationException("commitlog_directory must be specified");
- }
+
FileUtils.createDirectory(conf.commitlog_directory);
+
if (conf.saved_caches_directory == null)
- {
throw new ConfigurationException("saved_caches_directory must be specified");
- }
+
FileUtils.createDirectory(conf.saved_caches_directory);
}
- catch (ConfigurationException ex) {
- logger.error("Fatal error: " + ex.getMessage());
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal error: " + e.getMessage());
System.err.println("Bad configuration; unable to start server");
System.exit(1);
}
+ catch (FSWriteError e)
+ {
+ logger.error("Fatal error: " + e.getMessage());
+ System.err.println(e.getCause().getMessage() + "; unable to start server");
+ System.exit(1);
+ }
}
public static IPartitioner getPartitioner()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index a7cdda3..9ce5a26 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.config;
-import java.io.IOException;
import java.util.*;
import org.apache.commons.lang.ObjectUtils;
@@ -195,12 +194,12 @@ public final class KSMetaData
}
- public KSMetaData reloadAttributes() throws IOException
+ public KSMetaData reloadAttributes()
{
Row ksDefRow = SystemTable.readSchemaRow(name);
if (ksDefRow.cf == null)
- throw new IOException(String.format("%s not found in the schema definitions table (%s).", name, SystemTable.SCHEMA_KEYSPACES_CF));
+ throw new RuntimeException(String.format("%s not found in the schema definitions table (%s).", name, SystemTable.SCHEMA_KEYSPACES_CF));
return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList());
}
@@ -236,10 +235,8 @@ public final class KSMetaData
* @param row Keyspace attributes in serialized form
*
* @return deserialized keyspace without cf_defs
- *
- * @throws IOException if deserialization failed
*/
- public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms) throws IOException
+ public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms)
{
UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one();
try
@@ -263,10 +260,8 @@ public final class KSMetaData
* @param serializedCFs Collection of the serialized ColumnFamilies
*
* @return deserialized keyspace with cf_defs
- *
- * @throws IOException if deserialization failed
*/
- public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs) throws IOException
+ public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs)
{
Map<String, CFMetaData> cfs = deserializeColumnFamilies(serializedCFs);
return fromSchema(serializedKs, cfs.values());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 193d892..f4f06df 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -171,16 +170,8 @@ public class CollationController
&& cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
{
RowMutation rm = new RowMutation(cfs.table.name, new Row(filter.key, returnCF.cloneMe()));
- try
- {
- // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
- Table.open(rm.getTable()).apply(rm, false, false);
- }
- catch (IOException e)
- {
- // log and allow the result to be returned
- logger.error("Error re-writing read results", e);
- }
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Table.open(rm.getTable()).apply(rm, false, false);
}
// Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 695e480..03497ef 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
import java.io.File;
-import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
@@ -142,7 +141,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/** ops count last time we computed liveRatio */
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
- public void reload() throws IOException
+ public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -371,14 +370,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (components.contains(Component.COMPACTED_MARKER) || desc.temporary)
{
- try
- {
- SSTable.delete(desc, components);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ SSTable.delete(desc, components);
continue;
}
@@ -391,14 +383,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
logger.warn("Removing orphans for {}: {}", desc, components);
for (Component component : components)
{
- try
- {
- FileUtils.deleteWithConfirm(desc.filenameFor(component));
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
}
@@ -516,10 +501,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.getIndexedColumns());
}
- catch (IOException e)
- {
- throw new IOError(e);
- }
finally
{
SSTableReader.releaseReferences(newSSTables);
@@ -547,10 +528,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
cfs.indexManager.maybeBuildSecondaryIndexes(sstables, indexes);
cfs.indexManager.setIndexBuilt(indexes);
}
- catch (IOException e)
- {
- throw new IOError(e);
- }
finally
{
SSTableReader.releaseReferences(sstables);
@@ -578,7 +555,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return getTempSSTablePath(location, version);
}
- public String getTempSSTablePath(File directory, Descriptor.Version version)
+ private String getTempSSTablePath(File directory)
+ {
+ return getTempSSTablePath(directory, Descriptor.Version.CURRENT);
+ }
+
+ private String getTempSSTablePath(File directory, Descriptor.Version version)
{
Descriptor desc = new Descriptor(version,
directory,
@@ -589,11 +571,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return desc.filenameFor(Component.DATA);
}
- public String getTempSSTablePath(File directory)
- {
- return getTempSSTablePath(directory, Descriptor.Version.CURRENT);
- }
-
/** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */
public Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
{
@@ -655,7 +632,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
return postFlushExecutor.submit(new WrappedRunnable()
{
- public void runMayThrow() throws InterruptedException, IOException, ExecutionException
+ public void runMayThrow() throws InterruptedException, ExecutionException
{
latch.await();
@@ -1392,14 +1369,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public void close() throws IOException
{
SSTableReader.releaseReferences(view.sstables);
- try
- {
- iterator.close();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ iterator.close();
}
};
}
@@ -1477,7 +1447,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new RuntimeException(e);
}
}
}
@@ -1498,9 +1468,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (SSTableReader ssTable : currentView.sstables)
{
File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName);
-
- // hard links
- ssTable.createLinks(snapshotDirectory.getPath());
+ ssTable.createLinks(snapshotDirectory.getPath()); // hard links
if (logger.isDebugEnabled())
logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
" created in " + snapshotDirectory);
@@ -1509,10 +1477,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (cfs.compactionStrategy instanceof LeveledCompactionStrategy)
cfs.directories.snapshotLeveledManifest(snapshotName);
}
- catch (IOException e)
- {
- throw new IOError(e);
- }
finally
{
SSTableReader.releaseReferences(currentView.sstables);
@@ -1557,7 +1521,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return directories.snapshotExists(snapshotName);
}
- public void clearSnapshot(String snapshotName) throws IOException
+ public void clearSnapshot(String snapshotName)
{
directories.clearSnapshot(snapshotName);
}
@@ -1708,7 +1672,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a Future to the delete operation. Call the future's get() to make
* sure the column family has been deleted
*/
- public Future<?> truncate() throws IOException, ExecutionException, InterruptedException
+ public Future<?> truncate() throws ExecutionException, InterruptedException
{
// We have two goals here:
// - truncate should delete everything written before truncate was invoked
@@ -1951,17 +1915,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return intern(name);
}
- public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context) throws IOException
+ public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context)
{
SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context);
- return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.Version.CURRENT),
- estimatedRows,
- metadata,
- partitioner,
- sstableMetadataCollector);
+ String filename = getFlushPath(estimatedSize, Descriptor.Version.CURRENT);
+ return new SSTableWriter(filename, estimatedRows, metadata, partitioner, sstableMetadataCollector);
}
- public SSTableWriter createCompactionWriter(long estimatedRows, File location, Collection<SSTableReader> sstables) throws IOException
+ public SSTableWriter createCompactionWriter(long estimatedRows, File location, Collection<SSTableReader> sstables)
{
ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index b32811d..5a691bc 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -19,13 +19,13 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,7 +40,7 @@ public class ColumnSerializer implements IColumnSerializer
public final static int COUNTER_UPDATE_MASK = 0x08;
public final static int RANGE_TOMBSTONE_MASK = 0x10;
- public void serialize(IColumn column, DataOutput dos)
+ public void serialize(IColumn column, DataOutput dos) throws IOException
{
assert column.name().remaining() > 0;
ByteBufferUtil.writeWithShortLength(column.name(), dos);
@@ -138,15 +138,17 @@ public class ColumnSerializer implements IColumnSerializer
String details = "";
if (dis instanceof FileDataInput)
{
+ FileDataInput fdis = (FileDataInput)dis;
+ long remaining;
try
{
- FileDataInput fdis = (FileDataInput)dis;
- details = String.format(" (%s, %d bytes remaining)", fdis.getPath(), fdis.bytesRemaining());
+ remaining = fdis.bytesRemaining();
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSReadError(e, fdis.getPath());
}
+ details = String.format(" (%s, %d bytes remaining)", fdis.getPath(), remaining);
}
return new CorruptColumnException(String.format(format, name.remaining(), details));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index ee034ae..1061a79 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,13 +24,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.*;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -38,7 +38,6 @@ import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Interval;
import org.apache.cassandra.utils.IntervalTree;
-import org.apache.cassandra.utils.WrappedRunnable;
public class DataTracker
{
@@ -159,12 +158,12 @@ public class DataTracker
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- protected void runMayThrow() throws Exception
+ public void run()
{
File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
- sstable.createLinks(backupsDir.getCanonicalPath());
+ sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
}
};
StorageService.tasks.execute(runnable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 6660916..f156f39 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -126,7 +126,7 @@ public class DefsTable
public static final String OLD_SCHEMA_CF = "Schema";
/* dumps current keyspace definitions to storage */
- public static synchronized void dumpToStorage(Collection<KSMetaData> keyspaces) throws IOException
+ public static synchronized void dumpToStorage(Collection<KSMetaData> keyspaces)
{
long timestamp = System.currentTimeMillis();
@@ -138,10 +138,8 @@ public class DefsTable
* Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF)
*
* @return Collection of found keyspace definitions
- *
- * @throws IOException if failed to read SCHEMA_KEYSPACES_CF
*/
- public static Collection<KSMetaData> loadFromTable() throws IOException
+ public static Collection<KSMetaData> loadFromTable()
{
List<Row> serializedSchema = SystemTable.serializedSchema(SystemTable.SCHEMA_KEYSPACES_CF);
@@ -263,7 +261,6 @@ public class DefsTable
}
private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
- throws ConfigurationException, IOException
{
// calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
@@ -404,7 +401,7 @@ public class DefsTable
Table.open(ksm.name);
}
- private static void addColumnFamily(CFMetaData cfm) throws IOException
+ private static void addColumnFamily(CFMetaData cfm)
{
assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null;
KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName);
@@ -422,7 +419,7 @@ public class DefsTable
Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
}
- private static void updateKeyspace(KSMetaData newState) throws IOException
+ private static void updateKeyspace(KSMetaData newState)
{
KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
assert oldKsm != null;
@@ -442,7 +439,7 @@ public class DefsTable
}
}
- private static void updateColumnFamily(CFMetaData newState) throws IOException
+ private static void updateColumnFamily(CFMetaData newState)
{
CFMetaData cfm = Schema.instance.getCFMetaData(newState.ksName, newState.cfName);
assert cfm != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 20a8c20..c146c9f 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -19,22 +19,21 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.FileFilter;
-import java.io.IOError;
import java.io.IOException;
import java.util.*;
-import org.apache.commons.lang.StringUtils;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.Pair;
/**
@@ -98,15 +97,8 @@ public class Directories
if (!StorageService.instance.isClientMode())
{
- try
- {
- for (File dir : sstableDirectories)
- FileUtils.createDirectory(dir);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ for (File dir : sstableDirectories)
+ FileUtils.createDirectory(dir);
}
}
@@ -325,13 +317,14 @@ public class Directories
return manifestFile;
}
- public void snapshotLeveledManifest(String snapshotName) throws IOException
+ public void snapshotLeveledManifest(String snapshotName)
{
File manifest = tryGetLeveledManifest();
if (manifest != null)
{
File snapshotDirectory = getOrCreate(manifest.getParentFile(), SNAPSHOT_SUBDIR, snapshotName);
- CLibrary.createHardLink(manifest, new File(snapshotDirectory, manifest.getName()));
+ File target = new File(snapshotDirectory, manifest.getName());
+ FileUtils.createHardLink(manifest, target);
}
}
@@ -346,7 +339,7 @@ public class Directories
return false;
}
- public void clearSnapshot(String snapshotName) throws IOException
+ public void clearSnapshot(String snapshotName)
{
// If snapshotName is empty or null, we will delete the entire snapshot directory
String tag = snapshotName == null ? "" : snapshotName;
@@ -368,11 +361,11 @@ public class Directories
if (dir.exists())
{
if (!dir.isDirectory())
- throw new IOError(new IOException(String.format("Invalid directory path %s: path exists but is not a directory", dir)));
+ throw new AssertionError(String.format("Invalid directory path %s: path exists but is not a directory", dir));
}
else if (!dir.mkdirs())
{
- throw new IOError(new IOException("Unable to create directory " + dir));
+ throw new FSWriteError(new IOException("Unable to create directory " + dir), dir);
}
return dir;
}
@@ -410,15 +403,8 @@ public class Directories
// Check whether the migration might create too long a filename
int longestLocation = -1;
- try
- {
- for (File loc : dataFileLocations)
- longestLocation = Math.max(longestLocation, loc.getCanonicalPath().length());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ for (File loc : dataFileLocations)
+ longestLocation = Math.max(longestLocation, FileUtils.getCanonicalPath(loc).length());
// Check that migration won't error out halfway through from too-long paths. For Windows, we need to check
// total path length <= 255 (see http://msdn.microsoft.com/en-us/library/aa365247.aspx and discussion on CASSANDRA-2749);
@@ -519,29 +505,22 @@ public class Directories
private static void migrateFile(File file, File ksDir, String additionalPath)
{
- try
- {
- if (file.isDirectory())
- return;
+ if (file.isDirectory())
+ return;
- String name = file.getName();
- boolean isManifest = name.endsWith(LeveledManifest.EXTENSION);
- String cfname = isManifest
- ? name.substring(0, name.length() - LeveledManifest.EXTENSION.length())
- : name.substring(0, name.indexOf(Component.separator));
+ String name = file.getName();
+ boolean isManifest = name.endsWith(LeveledManifest.EXTENSION);
+ String cfname = isManifest
+ ? name.substring(0, name.length() - LeveledManifest.EXTENSION.length())
+ : name.substring(0, name.indexOf(Component.separator));
- int idx = cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); // idx > 0 => secondary index
- String dirname = idx > 0 ? cfname.substring(0, idx) : cfname;
- File destDir = getOrCreate(ksDir, dirname, additionalPath);
+ int idx = cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); // idx > 0 => secondary index
+ String dirname = idx > 0 ? cfname.substring(0, idx) : cfname;
+ File destDir = getOrCreate(ksDir, dirname, additionalPath);
- File destFile = new File(destDir, isManifest ? name : ksDir.getName() + Component.separator + name);
- logger.debug(String.format("[upgrade to 1.1] Moving %s to %s", file, destFile));
- FileUtils.renameWithConfirm(file, destFile);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ File destFile = new File(destDir, isManifest ? name : ksDir.getName() + Component.separator + name);
+ logger.debug(String.format("[upgrade to 1.1] Moving %s to %s", file, destFile));
+ FileUtils.renameWithConfirm(file, destFile);
}
// Hack for tests, don't use otherwise
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a2d27f1..8d8ad89 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.db;
import java.io.File;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Function;
+import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,8 +255,7 @@ public class Memtable
return builder.toString();
}
-
- private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws IOException, ExecutionException, InterruptedException
+ private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws ExecutionException, InterruptedException
{
logger.info("Writing " + this);
@@ -293,15 +292,15 @@ public class Memtable
}
ssTable = writer.closeAndOpenReader();
+ logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
+ ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get()));
+ return ssTable;
}
- catch (Exception e)
+ catch (Throwable e)
{
writer.abort();
- throw FBUtilities.unchecked(e);
+ throw Throwables.propagate(e);
}
- logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
- ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get()));
- return ssTable;
}
public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final Future<ReplayPosition> context)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index bc5e820..713d97d 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -17,9 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOError;
-import java.io.IOException;
-
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -28,16 +25,9 @@ public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
{
public void doVerb(MessageIn<RowMutation> message, String id)
{
- try
- {
- RowMutation rm = message.payload;
- rm.apply();
- WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ RowMutation rm = message.payload;
+ rm.apply();
+ WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
+ MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 7024b01..06e8c37 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -256,14 +256,13 @@ public class RowMutation implements IMutation
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
- public void apply() throws IOException
+ public void apply()
{
KSMetaData ksm = Schema.instance.getTableDefinition(getTable());
-
Table.open(table).apply(this, ksm.durableWrites);
}
- public void applyUnsafe() throws IOException
+ public void applyUnsafe()
{
Table.open(table).apply(this, false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 041c521..2c8444f 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -23,14 +23,15 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.RepairCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SliceFromReadCommand extends ReadCommand
{
@@ -61,7 +62,7 @@ public class SliceFromReadCommand extends ReadCommand
return readCommand;
}
- public Row getRow(Table table) throws IOException
+ public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
return table.getRow(new QueryFilter(dk, queryPath, filter));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
index c6ce0df..f95dc8f 100644
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ b/src/java/org/apache/cassandra/db/SuperColumn.java
@@ -380,23 +380,16 @@ class SuperColumnSerializer implements IColumnSerializer
return comparator;
}
- public void serialize(IColumn column, DataOutput dos)
+ public void serialize(IColumn column, DataOutput dos) throws IOException
{
SuperColumn superColumn = (SuperColumn)column;
ByteBufferUtil.writeWithShortLength(superColumn.name(), dos);
- try
+ DeletionInfo.serializer().serialize(superColumn.deletionInfo(), dos, MessagingService.VERSION_10);
+ Collection<IColumn> columns = superColumn.getSubColumns();
+ dos.writeInt(columns.size());
+ for (IColumn subColumn : columns)
{
- DeletionInfo.serializer().serialize(superColumn.deletionInfo(), dos, MessagingService.VERSION_10);
- Collection<IColumn> columns = superColumn.getSubColumns();
- dos.writeInt(columns.size());
- for (IColumn subColumn : columns)
- {
- Column.serializer().serialize(subColumn, dos);
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ Column.serializer().serialize(subColumn, dos);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index d271b34..6910905 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@@ -26,7 +25,6 @@ import java.util.concurrent.ExecutionException;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +101,7 @@ public class SystemTable
}
}
- private static void setupVersion() throws IOException
+ private static void setupVersion()
{
String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version) VALUES ('%s', '%s', '%s', '%s')";
processInternal(String.format(req, LOCAL_CF,
@@ -279,9 +277,9 @@ public class SystemTable
* 3. files are present but you can't read them: bad
* @throws ConfigurationException
*/
- public static void checkHealth() throws ConfigurationException, IOException
+ public static void checkHealth() throws ConfigurationException
{
- Table table = null;
+ Table table;
try
{
table = Table.open(Table.SYSTEM_TABLE);
@@ -324,7 +322,7 @@ public class SystemTable
: deserializeTokens(result.one().getBytes("token_bytes"));
}
- public static int incrementAndGetGeneration() throws IOException
+ public static int incrementAndGetGeneration()
{
String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
@@ -404,15 +402,7 @@ public class SystemTable
cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes(table));
rm.add(cf);
- try
- {
- rm.apply();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
-
+ rm.apply();
forceBlockingFlush(INDEX_CF);
}
@@ -420,15 +410,7 @@ public class SystemTable
{
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes(table));
rm.delete(new QueryPath(INDEX_CF, null, ByteBufferUtil.bytes(indexName)), FBUtilities.timestampMicros());
- try
- {
- rm.apply();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
-
+ rm.apply();
forceBlockingFlush(INDEX_CF);
}
@@ -511,15 +493,8 @@ public class SystemTable
RowMutation rmAll = new RowMutation(Table.SYSTEM_TABLE, ALL_LOCAL_NODE_ID_KEY);
rmCurrent.add(cf2);
rmAll.add(cf);
- try
- {
- rmCurrent.apply();
- rmAll.apply();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ rmCurrent.apply();
+ rmAll.apply();
}
public static List<NodeId.NodeIdRecord> getOldLocalNodeIds()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 8617f44..b6ddcad 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -26,6 +25,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -34,12 +38,6 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
/**
* It represents a Keyspace.
@@ -63,16 +61,7 @@ public class Table
static
{
if (!StorageService.instance.isClientMode())
- {
- try
- {
- DatabaseDescriptor.createAllDirectories();
- }
- catch (IOException ex)
- {
- throw new IOError(ex);
- }
- }
+ DatabaseDescriptor.createAllDirectories();
}
/* Table name. */
@@ -221,7 +210,7 @@ public class Table
* @param snapshotName the user supplied snapshot name. It empty or null,
* all the snapshots will be cleaned
*/
- public void clearSnapshot(String snapshotName) throws IOException
+ public void clearSnapshot(String snapshotName)
{
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
{
@@ -316,15 +305,8 @@ public class Table
ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
assert cfs.getColumnFamilyName().equals(cfName);
- try
- {
- cfs.metadata.reload();
- cfs.reload();
- }
- catch (IOException e)
- {
- throw FBUtilities.unchecked(e);
- }
+ cfs.metadata.reload();
+ cfs.reload();
}
else
{
@@ -332,14 +314,14 @@ public class Table
}
}
- public Row getRow(QueryFilter filter) throws IOException
+ public Row getRow(QueryFilter filter)
{
ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
ColumnFamily columnFamily = cfStore.getColumnFamily(filter, ArrayBackedSortedColumns.factory());
return new Row(filter.key, columnFamily);
}
- public void apply(RowMutation mutation, boolean writeCommitLog) throws IOException
+ public void apply(RowMutation mutation, boolean writeCommitLog)
{
apply(mutation, writeCommitLog, true);
}
@@ -351,9 +333,8 @@ public class Table
* may happen concurrently, depending on the CL Executor type.
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
- * @throws IOException
*/
- public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes) throws IOException
+ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
if (logger.isDebugEnabled())
logger.debug("applying mutation of row {}", ByteBufferUtil.bytesToHex(mutation.key()));
@@ -501,14 +482,7 @@ public class Table
{
ColumnFamily cf = readCurrentIndexedColumns(key, cfs, indexedColumns);
if (cf != null)
- try
- {
- cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
}
}
finally
@@ -522,7 +496,7 @@ public class Table
return indexLocks[Math.abs(key.hashCode() % indexLocks.length)];
}
- public List<Future<?>> flush() throws IOException
+ public List<Future<?>> flush()
{
List<Future<?>> futures = new ArrayList<Future<?>>();
for (UUID cfId : columnFamilyStores.keySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 400efe2..ff9448f 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.db;
-import java.io.IOError;
-import java.io.IOException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.FSError;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -33,34 +31,29 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation>
public void doVerb(MessageIn<Truncation> message, String id)
{
+ Truncation t = message.payload;
+ logger.debug("Applying {}", t);
try
{
- Truncation t = message.payload;
- logger.debug("Applying {}", t);
-
- try
- {
- ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
- cfs.truncate().get();
- }
- catch (Exception e)
- {
- logger.error("Error in truncation", e);
- respondError(t, message);
- }
- logger.debug("Truncate operation succeeded at this host");
-
- TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
- logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.from });
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+ ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
+ cfs.truncate().get();
}
- catch (IOException e)
+ catch (Exception e)
{
- throw new IOError(e);
+ logger.error("Error in truncation", e);
+ respondError(t, message);
+
+ if (FSError.findNested(e) != null)
+ throw FSError.findNested(e);
}
+ logger.debug("Truncate operation succeeded at this host");
+
+ TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
+ logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.from });
+ MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
- private static void respondError(Truncation t, MessageIn truncateRequestMessage) throws IOException
+ private static void respondError(Truncation t, MessageIn truncateRequestMessage)
{
TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, false);
MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index 8dd7b63..98a9552 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.columniterator;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -107,7 +107,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
catch (IOException e)
{
sstable.markSuspect();
- throw new IOError(e);
+ throw new CorruptSSTableException(e, sstable.descriptor);
}
}
@@ -146,15 +146,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
OnDiskAtom column = blockColumns.poll();
if (column == null)
{
- try
- {
- if (!fetcher.fetchMoreData())
- return endOfData();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ if (!fetcher.fetchMoreData())
+ return endOfData();
}
else
{
@@ -204,7 +197,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
protected abstract boolean setNextSlice();
- protected abstract boolean fetchMoreData() throws IOException;
+ protected abstract boolean fetchMoreData();
protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
{
@@ -284,7 +277,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
return currentSliceIdx < slices.length;
}
- protected boolean fetchMoreData() throws IOException
+ protected boolean fetchMoreData()
{
if (!hasMoreSlice())
return false;
@@ -325,7 +318,14 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
if (gotSome)
return true;
}
- return getNextBlock();
+ try
+ {
+ return getNextBlock();
+ }
+ catch (IOException e)
+ {
+ throw new CorruptSSTableException(e, file.getPath());
+ }
}
private boolean getNextBlock() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 1225e60..0a5bed9 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.columniterator;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -34,6 +33,7 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
@@ -71,7 +71,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
catch (IOException e)
{
sstable.markSuspect();
- throw new IOError(e);
+ throw new CorruptSSTableException(e, sstable.descriptor);
}
finally
{
@@ -91,10 +91,10 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
{
read(sstable, file, indexEntry);
}
- catch (IOException ioe)
+ catch (IOException e)
{
sstable.markSuspect();
- throw new IOError(ioe);
+ throw new CorruptSSTableException(e, sstable.descriptor);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index f974600..b9a5a64 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.columniterator;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
@@ -86,7 +86,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
catch (IOException e)
{
sstable.markSuspect();
- throw new IOError(e);
+ throw new CorruptSSTableException(e, sstable.descriptor);
}
}
@@ -103,7 +103,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
}
catch (IOException e)
{
- throw new RuntimeException("error reading " + i + " of " + columns, e);
+ throw new CorruptSSTableException(e, file.getPath());
}
if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
return endOfData();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
index bc6b72c..4434532 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.*;
@@ -93,14 +92,7 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
}
// now sync and set the tasks' values (which allows thread calling get() to proceed)
- try
- {
- CommitLog.instance.sync();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ CommitLog.instance.sync();
for (int i = 0; i < incompleteTasks.size(); i++)
{
incompleteTasks.get(i).set(taskValues.get(i));
@@ -153,7 +145,7 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
{
new Thread(new WrappedRunnable()
{
- public void runMayThrow() throws InterruptedException, IOException
+ public void runMayThrow() throws InterruptedException
{
while (!queue.isEmpty())
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 92bcdcf..e3f3c13 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -23,19 +23,19 @@ import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.net.MessagingService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.net.MessagingService;
/*
* Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -60,17 +60,10 @@ public class CommitLog implements CommitLogMBean
private CommitLog()
{
- try
- {
- DatabaseDescriptor.createAllDirectories();
+ DatabaseDescriptor.createAllDirectories();
- allocator = new CommitLogAllocator();
- activateNextSegment();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ allocator = new CommitLogAllocator();
+ activateNextSegment();
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
? new BatchCommitLogExecutorService()
@@ -90,7 +83,7 @@ public class CommitLog implements CommitLogMBean
/**
* FOR TESTING PURPOSES. See CommitLogAllocator.
*/
- public void resetUnsafe() throws IOException
+ public void resetUnsafe()
{
allocator.resetUnsafe();
activateNextSegment();
@@ -165,7 +158,7 @@ public class CommitLog implements CommitLogMBean
{
Callable<ReplayPosition> task = new Callable<ReplayPosition>()
{
- public ReplayPosition call() throws Exception
+ public ReplayPosition call()
{
return activeSegment.getContext();
}
@@ -188,7 +181,7 @@ public class CommitLog implements CommitLogMBean
*
* @param rm the RowMutation to add to the log
*/
- public void add(RowMutation rm) throws IOException
+ public void add(RowMutation rm)
{
long totalSize = RowMutation.serializer.serializedSize(rm, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
@@ -207,11 +200,11 @@ public class CommitLog implements CommitLogMBean
* @param cfId the column family ID that was flushed
* @param context the replay position of the flush
*/
- public void discardCompletedSegments(final UUID cfId, final ReplayPosition context) throws IOException
+ public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
{
Callable task = new Callable()
{
- public Object call() throws IOException
+ public Object call()
{
logger.debug("discard completed log segments for {}, column family {}", context, cfId);
@@ -266,7 +259,7 @@ public class CommitLog implements CommitLogMBean
/**
* Forces a disk flush on the commit log files that need it.
*/
- public void sync() throws IOException
+ public void sync()
{
for (CommitLogSegment segment : allocator.getActiveSegments())
{
@@ -307,7 +300,7 @@ public class CommitLog implements CommitLogMBean
Callable<?> task = new Callable()
{
- public Object call() throws IOException
+ public Object call()
{
if (activeSegment.position() > 0)
activateNextSegment();
@@ -324,7 +317,7 @@ public class CommitLog implements CommitLogMBean
*
* @return the newly activated segment
*/
- private void activateNextSegment() throws IOException
+ private void activateNextSegment()
{
activeSegment = allocator.fetchSegment();
logger.debug("Active segment is now {}", activeSegment);
@@ -367,25 +360,25 @@ public class CommitLog implements CommitLogMBean
public void run()
{
+ if (!activeSegment.hasCapacityFor(rowMutation))
+ {
+ CommitLogSegment oldSegment = activeSegment;
+ activateNextSegment();
+ // Now we can run the user defined command just before switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
+ }
try
{
- if (!activeSegment.hasCapacityFor(rowMutation))
- {
- CommitLogSegment oldSegment = activeSegment;
- activateNextSegment();
- // Now we can run the user defined command just before switching to the new commit log.
- // (Do this here instead of in the recycle call so we can get a head start on the archive.)
- archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
- }
activeSegment.write(rowMutation);
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSWriteError(e, activeSegment.getPath());
}
}
- public Object call() throws Exception
+ public Object call()
{
run();
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index 0a19301..2855979 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.db.commitlog;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
@@ -183,15 +181,8 @@ public class CommitLogAllocator
|| CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
{
// (don't decrease managed size, since this was never a "live" segment)
- try
- {
- logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
- FileUtils.deleteWithConfirm(file);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
+ FileUtils.deleteWithConfirm(file);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ef45a6d..9bc0179 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -27,6 +27,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Checksum;
+import com.google.common.collect.Ordering;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
@@ -43,12 +48,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.PureJavaCrc32;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.commons.lang.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Ordering;
public class CommitLogReplayer
{
@@ -92,7 +92,7 @@ public class CommitLogReplayer
recover(file);
}
- public int blockForWrites() throws IOException
+ public int blockForWrites()
{
for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index bf67095..e9ce199 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.commitlog;
import java.io.File;
-import java.io.IOError;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@@ -36,6 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.PureJavaCrc32;
@@ -119,7 +119,7 @@ public class CommitLogSegment
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSWriteError(e, logFile);
}
}
@@ -130,15 +130,8 @@ public class CommitLogSegment
{
// TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling?
close();
- try
- {
- if (deleteFile)
- FileUtils.deleteWithConfirm(logFile);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ if (deleteFile)
+ FileUtils.deleteWithConfirm(logFile);
}
/**
@@ -157,10 +150,10 @@ public class CommitLogSegment
{
sync();
}
- catch (IOException e)
+ catch (FSWriteError e)
{
- // This is a best effort thing anyway
- logger.warn("I/O error flushing " + this + " " + e);
+ logger.error("I/O error flushing " + this + " " + e);
+ throw e;
}
close();
@@ -235,11 +228,18 @@ public class CommitLogSegment
/**
* Forces a disk flush for this segment file.
*/
- public void sync() throws IOException
+ public void sync()
{
if (needsSync)
{
- buffer.force();
+ try
+ {
+ buffer.force();
+ }
+ catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
+ {
+ throw new FSWriteError(e, getPath());
+ }
needsSync = false;
}
}
@@ -283,7 +283,7 @@ public class CommitLogSegment
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSWriteError(e, getPath());
}
}