You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/27 22:47:19 UTC
[4/5] clean up ioexceptions patch by Aleksey Yeschenko and jbellis
for CASSANDRA-2116
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e188636..381b934 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,29 +24,29 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;
-import org.apache.cassandra.cache.KeyCacheKey;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.index.keys.KeysIndex;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
-import org.apache.cassandra.service.CacheService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.InstrumentingCache;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.keys.KeysIndex;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
@@ -271,7 +271,7 @@ public class SSTableReader extends SSTable
IndexSummary isummary,
Filter bf,
long maxDataAge,
- SSTableMetadata sstableMetadata) throws IOException
+ SSTableMetadata sstableMetadata)
{
assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
return new SSTableReader(desc,
@@ -295,7 +295,6 @@ public class SSTableReader extends SSTable
Filter bloomFilter,
long maxDataAge,
SSTableMetadata sstableMetadata)
- throws IOException
{
super(desc, components, metadata, partitioner);
this.sstableMetadata = sstableMetadata;
@@ -788,7 +787,7 @@ public class SSTableReader extends SSTable
catch (IOException e)
{
markSuspect();
- throw new IOError(e);
+ throw new CorruptSSTableException(e, input.getPath());
}
finally
{
@@ -930,13 +929,13 @@ public class SSTableReader extends SSTable
return in.readLong();
}
- public void createLinks(String snapshotDirectoryPath) throws IOException
+ public void createLinks(String snapshotDirectoryPath)
{
for (Component component : components)
{
File sourceFile = new File(descriptor.filenameFor(component));
File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
- CLibrary.createHardLink(sourceFile, targetLink);
+ FileUtils.createHardLink(sourceFile, targetLink);
}
}
@@ -1046,14 +1045,14 @@ public class SSTableReader extends SSTable
return sstableMetadata.ancestors;
}
- public RandomAccessReader openDataReader(boolean skipIOCache) throws IOException
+ public RandomAccessReader openDataReader(boolean skipIOCache)
{
return compression
? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(), skipIOCache)
: RandomAccessReader.open(new File(getFilename()), skipIOCache);
}
- public RandomAccessReader openIndexReader(boolean skipIOCache) throws IOException
+ public RandomAccessReader openIndexReader(boolean skipIOCache)
{
return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache);
}
@@ -1090,14 +1089,7 @@ public class SSTableReader extends SSTable
{
for (SSTableReader sstable : sstables)
{
- try
- {
- sstable.releaseReference();
- }
- catch (Exception ex)
- {
- logger.error("Failed releasing reference on " + sstable, ex);
- }
+ sstable.releaseReference();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 94e8522..dafb12b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.IOError;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@@ -52,16 +51,8 @@ public class SSTableScanner implements ICompactionScanner
*/
SSTableScanner(SSTableReader sstable, boolean skipCache)
{
- try
- {
- this.dfile = sstable.openDataReader(skipCache);
- this.ifile = sstable.openIndexReader(skipCache);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new IOError(e);
- }
+ this.dfile = sstable.openDataReader(skipCache);
+ this.ifile = sstable.openIndexReader(skipCache);
this.sstable = sstable;
this.filter = null;
}
@@ -72,16 +63,8 @@ public class SSTableScanner implements ICompactionScanner
*/
SSTableScanner(SSTableReader sstable, QueryFilter filter)
{
- try
- {
- this.dfile = sstable.openDataReader(false);
- this.ifile = sstable.openIndexReader(false);
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new IOError(e);
- }
+ this.dfile = sstable.openDataReader(false);
+ this.ifile = sstable.openIndexReader(false);
this.sstable = sstable;
this.filter = filter;
}
@@ -126,20 +109,13 @@ public class SSTableScanner implements ICompactionScanner
catch (IOException e)
{
sstable.markSuspect();
- throw new RuntimeException("corrupt sstable", e);
+ throw new CorruptSSTableException(e, ifile.getPath());
}
}
public long getLengthInBytes()
{
- try
- {
- return dfile.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ return dfile.length();
}
public long getCurrentPosition()
@@ -182,17 +158,9 @@ public class SSTableScanner implements ICompactionScanner
public boolean hasNext()
{
- try
- {
- if (row == null)
- return !dfile.isEOF();
- return finishedAt < dfile.length();
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new RuntimeException(e);
- }
+ if (row == null)
+ return !dfile.isEOF();
+ return finishedAt < dfile.length();
}
public OnDiskAtomIterator next()
@@ -215,7 +183,7 @@ public class SSTableScanner implements ICompactionScanner
catch (IOException e)
{
sstable.markSuspect();
- throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e);
+ throw new CorruptSSTableException(e, dfile.getPath());
}
}
@@ -238,17 +206,9 @@ public class SSTableScanner implements ICompactionScanner
public boolean hasNext()
{
- try
- {
- if (row == null)
- return !ifile.isEOF();
- return nextKey != null;
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new RuntimeException(e);
- }
+ if (row == null)
+ return !ifile.isEOF();
+ return nextKey != null;
}
public OnDiskAtomIterator next()
@@ -286,7 +246,7 @@ public class SSTableScanner implements ICompactionScanner
catch (IOException e)
{
sstable.markSuspect();
- throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e);
+ throw new CorruptSSTableException(e, ifile.getPath());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 43cd42c..9207276 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -23,8 +23,14 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
+
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TreeMapBackedSortedColumns;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.compress.CompressionParameters;
@@ -69,7 +75,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
AbstractType<?> comparator,
AbstractType<?> subComparator,
int bufferSizeInMB,
- CompressionParameters compressParameters) throws IOException
+ CompressionParameters compressParameters)
{
super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
@@ -82,7 +88,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
String columnFamily,
AbstractType<?> comparator,
AbstractType<?> subComparator,
- int bufferSizeInMB) throws IOException
+ int bufferSizeInMB)
{
this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null));
}
@@ -143,6 +149,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
catch (InterruptedException e)
{
throw new RuntimeException(e);
+
}
buffer = new Buffer();
currentSize = 0;
@@ -156,7 +163,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
if (diskWriter.exception instanceof IOException)
throw (IOException) diskWriter.exception;
else
- throw new RuntimeException(diskWriter.exception);
+ throw Throwables.propagate(diskWriter.exception);
}
}
@@ -165,7 +172,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
private class DiskWriter extends Thread
{
- volatile Exception exception = null;
+ volatile Throwable exception = null;
public void run()
{
@@ -184,7 +191,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
writer.closeAndOpenReader();
}
}
- catch (Exception e)
+ catch (Throwable e)
{
if (writer != null)
writer.abort();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index e5ad2f9..d0b1b4a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -18,12 +18,12 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.IOException;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSError;
/**
* A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -54,19 +54,19 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
String keyspace,
String columnFamily,
AbstractType<?> comparator,
- AbstractType<?> subComparator) throws IOException
+ AbstractType<?> subComparator)
{
this(directory,
new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner);
}
- public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
{
super(directory, metadata, partitioner);
writer = getWriter();
}
- public void close() throws IOException
+ public void close()
{
try
{
@@ -74,14 +74,14 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
writeRow(currentKey, columnFamily);
writer.closeAndOpenReader();
}
- catch (IOException e)
+ catch (FSError e)
{
writer.abort();
throw e;
}
}
- protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+ protected void writeRow(DecoratedKey key, ColumnFamily columnFamily)
{
writer.append(key, columnFamily);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 05f5b25..e7128f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -22,16 +22,16 @@ import java.util.*;
import java.util.regex.Pattern;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.config.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.util.*;
@@ -50,7 +50,7 @@ public class SSTableWriter extends SSTable
private final SSTableMetadata.Collector sstableMetadataCollector;
private final TypeSizes typeSizes = TypeSizes.NATIVE;
- public SSTableWriter(String filename, long keyCount) throws IOException
+ public SSTableWriter(String filename, long keyCount)
{
this(filename,
keyCount,
@@ -80,7 +80,7 @@ public class SSTableWriter extends SSTable
long keyCount,
CFMetaData metadata,
IPartitioner<?> partitioner,
- SSTableMetadata.Collector sstableMetadataCollector) throws IOException
+ SSTableMetadata.Collector sstableMetadataCollector)
{
super(Descriptor.fromFilename(filename),
components(metadata),
@@ -114,23 +114,17 @@ public class SSTableWriter extends SSTable
iwriter.mark();
}
- public void resetAndTruncate()
+ // NOT necessarily an FS error - not throwing FSWE.
+ public void resetAndTruncate() throws IOException
{
- try
- {
- dataFile.resetAndTruncate(dataMark);
- iwriter.resetAndTruncate();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ dataFile.resetAndTruncate(dataMark);
+ iwriter.resetAndTruncate();
}
/**
* Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
*/
- private long beforeAppend(DecoratedKey decoratedKey) throws IOException
+ private long beforeAppend(DecoratedKey decoratedKey)
{
assert decoratedKey != null : "Keys must not be null";
if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
@@ -138,7 +132,7 @@ public class SSTableWriter extends SSTable
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
- private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index) throws IOException
+ private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index)
{
lastWrittenKey = decoratedKey;
this.last = lastWrittenKey;
@@ -153,61 +147,89 @@ public class SSTableWriter extends SSTable
return entry;
}
- public RowIndexEntry append(AbstractCompactedRow row) throws IOException
+ public RowIndexEntry append(AbstractCompactedRow row)
{
- long currentPosition = beforeAppend(row.key);
- ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
- long dataStart = dataFile.getFilePointer();
- long dataSize = row.write(dataFile.stream);
- assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
- : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
- sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
- return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+ try
+ {
+ long currentPosition = beforeAppend(row.key);
+ ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
+ long dataStart = dataFile.getFilePointer();
+ long dataSize = row.write(dataFile.stream);
+ assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
+ : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
+ sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+ return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
}
- public void append(DecoratedKey decoratedKey, ColumnFamily cf) throws IOException
+ public void append(DecoratedKey decoratedKey, ColumnFamily cf)
{
long startPosition = beforeAppend(decoratedKey);
- ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
-
- // Since the columnIndex may insert RangeTombstone marker, computing
- // the size of the data is tricky.
- DataOutputBuffer buffer = new DataOutputBuffer();
-
- // build column index && write columns
- ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, cf.getColumnCount(), buffer);
- ColumnIndex index = builder.build(cf);
-
- TypeSizes typeSizes = TypeSizes.NATIVE;
- long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
- dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
-
- // Write deletion infos + column count
- DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
- dataFile.stream.writeInt(builder.writtenAtomCount());
- dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
-
- afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
-
- sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
+
+ // Since the columnIndex may insert RangeTombstone marker, computing
+ // the size of the data is tricky.
+ DataOutputBuffer buffer = new DataOutputBuffer();
+
+ // build column index && write columns
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, cf.getColumnCount(), buffer);
+ ColumnIndex index = builder.build(cf);
+
+ TypeSizes typeSizes = TypeSizes.NATIVE;
+ long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
+ dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
+
+ // Write deletion infos + column count
+ DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
+ dataFile.stream.writeInt(builder.writtenAtomCount());
+ dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
+ afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
+ sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
}
+ /**
+ * @throws IOException if a read from the DataInput fails
+ * @throws FSWriteError if a write to the dataFile fails
+ */
public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException
{
long currentPosition = beforeAppend(key);
- ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
- long dataStart = dataFile.getFilePointer();
-
- // write row size
- dataFile.stream.writeLong(dataSize);
+ long dataStart;
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
+ dataStart = dataFile.getFilePointer();
+ // write row size
+ dataFile.stream.writeLong(dataSize);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
- // cf data
DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
- DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
-
- // column size
int columnCount = in.readInt();
- dataFile.stream.writeInt(columnCount);
+
+ try
+ {
+ DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
+ dataFile.stream.writeInt(columnCount);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
// deserialize each column to obtain maxTimestamp and immediately serialize it.
long maxTimestamp = Long.MIN_VALUE;
@@ -245,7 +267,14 @@ public class SSTableWriter extends SSTable
tombstones.update(deletionTime);
}
maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
- columnIndexer.add(atom); // This write the atom on disk too
+ try
+ {
+ columnIndexer.add(atom); // This write the atom on disk too
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
}
assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
@@ -267,30 +296,38 @@ public class SSTableWriter extends SSTable
FileUtils.closeQuietly(iwriter);
FileUtils.closeQuietly(dataFile);
+ Set<Component> components = SSTable.componentsFor(descriptor);
try
{
- Set<Component> components = SSTable.componentsFor(descriptor);
if (!components.isEmpty())
SSTable.delete(descriptor, components);
}
- catch (Exception e)
+ catch (FSWriteError e)
{
logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+ throw e;
}
}
- public SSTableReader closeAndOpenReader() throws IOException
+ public SSTableReader closeAndOpenReader()
{
return closeAndOpenReader(System.currentTimeMillis());
}
- public SSTableReader closeAndOpenReader(long maxDataAge) throws IOException
+ public SSTableReader closeAndOpenReader(long maxDataAge)
{
// index and filter
iwriter.close();
- // main data, close will truncate if necessary
- dataFile.close();
+ try
+ {
+ // main data, close will truncate if necessary
+ dataFile.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
// write sstable statistics
SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName());
@@ -322,7 +359,7 @@ public class SSTableWriter extends SSTable
return sstable;
}
- private void maybeWriteDigest() throws IOException
+ private void maybeWriteDigest()
{
byte[] digest = dataFile.digest();
if (digest == null)
@@ -333,15 +370,29 @@ public class SSTableWriter extends SSTable
Descriptor newdesc = descriptor.asTemporary(false);
String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
String dataFileName = tmp[tmp.length - 1];
- out.write(String.format("%s %s", Hex.bytesToHex(digest), dataFileName).getBytes());
- out.close();
+ try
+ {
+ out.write(String.format("%s %s", Hex.bytesToHex(digest), dataFileName).getBytes());
+ out.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
}
- private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) throws IOException
+ private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata)
{
SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
- SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
- out.close();
+ try
+ {
+ SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
+ out.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
}
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
@@ -353,19 +404,16 @@ public class SSTableWriter extends SSTable
public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
{
- try
+ for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
{
- // do -Data last because -Data present should mean the sstable was completely renamed before crash
- for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
- FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
- FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
- // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
- FBUtilities.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
- }
- catch (IOException e)
- {
- throw new IOError(e);
+ FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
}
+
+ // do -Data last because -Data present should mean the sstable was completely renamed before crash
+ FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+ // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+ FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
}
public long getFilePointer()
@@ -389,7 +437,7 @@ public class SSTableWriter extends SSTable
public final Filter bf;
private FileMark mark;
- IndexWriter(long keyCount) throws IOException
+ IndexWriter(long keyCount)
{
indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)),
!DatabaseDescriptor.populateIOCacheOnFlush());
@@ -407,12 +455,20 @@ public class SSTableWriter extends SSTable
: FilterFactory.getFilter(keyCount, fpChance);
}
- public void append(DecoratedKey key, RowIndexEntry indexEntry) throws IOException
+ public void append(DecoratedKey key, RowIndexEntry indexEntry)
{
bf.add(key.key);
long indexPosition = indexFile.getFilePointer();
- ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
- RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream);
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
+ RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, indexFile.getPath());
+ }
+
if (logger.isTraceEnabled())
logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition);
@@ -423,20 +479,28 @@ public class SSTableWriter extends SSTable
/**
* Closes the index and bloomfilter, making the public state of this writer valid for consumption.
*/
- public void close() throws IOException
+ public void close()
{
- // bloom filter
- FileOutputStream fos = new FileOutputStream(descriptor.filenameFor(SSTable.COMPONENT_FILTER));
- DataOutputStream stream = new DataOutputStream(fos);
- FilterFactory.serialize(bf, stream, descriptor.version.filterType);
- stream.flush();
- fos.getFD().sync();
- stream.close();
-
- // index
- long position = indexFile.getFilePointer();
- indexFile.close(); // calls force
- FileUtils.truncate(indexFile.getPath(), position);
+ String path = descriptor.filenameFor(SSTable.COMPONENT_FILTER);
+ try
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStream stream = new DataOutputStream(fos);
+ FilterFactory.serialize(bf, stream, descriptor.version.filterType);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
// finalize in-memory index state
summary.complete();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 976f9f7..2c8b89e 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.io.util;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
public class BufferedSegmentedFile extends SegmentedFile
{
@@ -53,16 +51,9 @@ public class BufferedSegmentedFile extends SegmentedFile
public FileDataInput getSegment(long position)
{
- try
- {
- RandomAccessReader file = RandomAccessReader.open(new File(path));
- file.seek(position);
- return file;
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ RandomAccessReader file = RandomAccessReader.open(new File(path));
+ file.seek(position);
+ return file;
}
public void cleanup()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
index 2c73084..e01fd91 100644
--- a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
+++ b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
@@ -252,7 +252,7 @@ class ColumnIterator implements Iterator<Map.Entry<ByteBuffer, IColumn>>
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new IOError(e); // can't throw more detailed error. can't rethrow IOException - Iterator interface next().
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 806b114..d82fbae 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,9 +17,6 @@
*/
package org.apache.cassandra.io.util;
-import java.io.IOError;
-import java.io.IOException;
-
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
@@ -57,16 +54,9 @@ public class CompressedSegmentedFile extends SegmentedFile
public FileDataInput getSegment(long position)
{
- try
- {
- RandomAccessReader file = CompressedRandomAccessReader.open(path, metadata);
- file.seek(position);
- return file;
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ RandomAccessReader file = CompressedRandomAccessReader.open(path, metadata);
+ file.seek(position);
+ return file;
}
public void cleanup()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 354a835..fec0cff 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -25,9 +25,10 @@ import java.util.Comparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.WrappedRunnable;
-
+import org.apache.cassandra.utils.CLibrary;
public class FileUtils
{
@@ -39,34 +40,79 @@ public class FileUtils
private static final DecimalFormat df = new DecimalFormat("#.##");
- public static void deleteWithConfirm(String file) throws IOException
+ public static void createHardLink(File from, File to)
+ {
+ if (to.exists())
+ throw new RuntimeException("Tried to create duplicate hard link to " + to);
+ if (!from.exists())
+ throw new RuntimeException("Tried to hard link to file that does not exist " + from);
+
+ try
+ {
+ CLibrary.createHardLink(from, to);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, to);
+ }
+ }
+
+ public static File createTempFile(String prefix, String suffix, File directory)
+ {
+ try
+ {
+ return File.createTempFile(prefix, suffix, directory);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, directory);
+ }
+ }
+
+ public static File createTempFile(String prefix, String suffix)
+ {
+ return createTempFile(prefix, suffix, new File(System.getProperty("java.io.tmpdir")));
+ }
+
+ public static void deleteWithConfirm(String file)
{
deleteWithConfirm(new File(file));
}
- public static void deleteWithConfirm(File file) throws IOException
+ public static void deleteWithConfirm(File file)
{
assert file.exists() : "attempted to delete non-existing file " + file.getName();
if (logger.isDebugEnabled())
logger.debug("Deleting " + file.getName());
if (!file.delete())
- {
- throw new IOException("Failed to delete " + file.getAbsolutePath());
- }
+ throw new FSWriteError(new IOException("Failed to delete " + file.getAbsolutePath()), file);
+ }
+
+ public static void renameWithOutConfirm(String from, String to)
+ {
+ new File(from).renameTo(new File(to));
}
- public static void renameWithConfirm(File from, File to) throws IOException
+ public static void renameWithConfirm(String from, String to)
+ {
+ renameWithConfirm(new File(from), new File(to));
+ }
+
+ public static void renameWithConfirm(File from, File to)
{
assert from.exists();
if (logger.isDebugEnabled())
logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
+ // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
+ // and Windows is picky about that.
if (!from.renameTo(to))
- throw new IOException(String.format("Failed to rename %s to %s", from.getPath(), to.getPath()));
+ throw new RuntimeException(String.format("Failed to rename %s to %s", from.getPath(), to.getPath()));
}
- public static void truncate(String path, long size) throws IOException
+ public static void truncate(String path, long size)
{
RandomAccessFile file;
+
try
{
file = new RandomAccessFile(path, "rw");
@@ -75,13 +121,18 @@ public class FileUtils
{
throw new RuntimeException(e);
}
+
try
{
file.getChannel().truncate(size);
}
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
finally
{
- file.close();
+ closeQuietly(file);
}
}
@@ -123,6 +174,30 @@ public class FileUtils
throw e;
}
+ public static String getCanonicalPath(String filename)
+ {
+ try
+ {
+ return new File(filename).getCanonicalPath();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filename);
+ }
+ }
+
+ public static String getCanonicalPath(File file)
+ {
+ try
+ {
+ return file.getCanonicalPath();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, file);
+ }
+ }
+
public static class FileComparator implements Comparator<File>
{
public int compare(File f, File f2)
@@ -131,19 +206,17 @@ public class FileUtils
}
}
- public static void createDirectory(String directory) throws IOException
+ public static void createDirectory(String directory)
{
createDirectory(new File(directory));
}
- public static void createDirectory(File directory) throws IOException
+ public static void createDirectory(File directory)
{
if (!directory.exists())
{
if (!directory.mkdirs())
- {
- throw new IOException("unable to mkdirs " + directory);
- }
+ throw new FSWriteError(new IOException("Failed to mkdirs " + directory), directory);
}
}
@@ -163,9 +236,9 @@ public class FileUtils
public static void deleteAsync(final String file)
{
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new Runnable()
{
- protected void runMayThrow() throws IOException
+ public void run()
{
deleteWithConfirm(new File(file));
}
@@ -210,9 +283,9 @@ public class FileUtils
/**
* Deletes all files and subdirectories under "dir".
* @param dir Directory to be deleted
- * @throws IOException if any part of the tree cannot be deleted
+ * @throws FSWriteError if any part of the tree cannot be deleted
*/
- public static void deleteRecursive(File dir) throws IOException
+ public static void deleteRecursive(File dir)
{
if (dir.isDirectory())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ae81a08..f3620e3 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -17,12 +17,7 @@
*/
package org.apache.cassandra.io.util;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -33,6 +28,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.FSReadError;
+
public class MmappedSegmentedFile extends SegmentedFile
{
private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
@@ -83,17 +80,10 @@ public class MmappedSegmentedFile extends SegmentedFile
}
// not mmap'd: open a braf covering the segment
- try
- {
- // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
- RandomAccessReader file = RandomAccessReader.open(new File(path));
- file.seek(position);
- return file;
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
+ RandomAccessReader file = RandomAccessReader.open(new File(path));
+ file.seek(position);
+ return file;
}
public static void initCleaner()
@@ -205,10 +195,19 @@ public class MmappedSegmentedFile extends SegmentedFile
{
int segcount = boundaries.size() - 1;
Segment[] segments = new Segment[segcount];
- RandomAccessFile raf = null;
+ RandomAccessFile raf;
+
try
{
raf = new RandomAccessFile(path, "r");
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
for (int i = 0; i < segcount; i++)
{
long start = boundaries.get(i);
@@ -221,7 +220,7 @@ public class MmappedSegmentedFile extends SegmentedFile
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSReadError(e, path);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index c7ed792..ba7587f 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -17,14 +17,11 @@
*/
package org.apache.cassandra.io.util;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.utils.CLibrary;
public class RandomAccessReader extends RandomAccessFile implements FileDataInput
@@ -61,7 +58,8 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
private final long fileLength;
- public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws IOException
+ // used in tests
+ public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws FileNotFoundException
{
super(file, "r");
@@ -74,66 +72,93 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
buffer = new byte[bufferSize];
this.skipIOCache = skipIOCache;
- fd = CLibrary.getfd(getFD());
+ try
+ {
+ fd = CLibrary.getfd(getFD());
+ }
+ catch (IOException e)
+ {
+ // fd == null, Not Supposed To Happen
+ throw new RuntimeException(e);
+ }
// we can cache file length in read-only mode
- fileLength = channel.size();
+ try
+ {
+ fileLength = channel.size();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filePath);
+ }
validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
}
- public static RandomAccessReader open(File file, boolean skipIOCache) throws IOException
+ public static RandomAccessReader open(File file, boolean skipIOCache)
{
return open(file, DEFAULT_BUFFER_SIZE, skipIOCache);
}
- public static RandomAccessReader open(File file) throws IOException
+ public static RandomAccessReader open(File file)
{
return open(file, DEFAULT_BUFFER_SIZE, false);
}
- public static RandomAccessReader open(File file, int bufferSize) throws IOException
+ public static RandomAccessReader open(File file, int bufferSize)
{
return open(file, bufferSize, false);
}
- public static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache) throws IOException
+ public static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache)
{
- return new RandomAccessReader(file, bufferSize, skipIOCache);
+ try
+ {
+ return new RandomAccessReader(file, bufferSize, skipIOCache);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
// convert open into open
- public static RandomAccessReader open(SequentialWriter writer) throws IOException
+ public static RandomAccessReader open(SequentialWriter writer)
{
return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE);
}
/**
* Read data from file starting from current currentOffset to populate buffer.
- * @throws IOException on any I/O error.
*/
- protected void reBuffer() throws IOException
+ protected void reBuffer()
{
resetBuffer();
- if (bufferOffset >= channel.size())
- return;
+ try
+ {
+ if (bufferOffset >= channel.size())
+ return;
+
+ channel.position(bufferOffset); // setting channel position
- channel.position(bufferOffset); // setting channel position
+ int read = 0;
- int read = 0;
+ while (read < buffer.length)
+ {
+ int n = super.read(buffer, read, buffer.length - read);
+ if (n < 0)
+ break;
+ read += n;
+ }
- while (read < buffer.length)
+ validBufferBytes = read;
+ bytesSinceCacheFlush += read;
+ }
+ catch (IOException e)
{
- int n = super.read(buffer, read, buffer.length - read);
- if (n < 0)
- break;
- read += n;
+ throw new FSReadError(e, filePath);
}
- validBufferBytes = read;
-
- bytesSinceCacheFlush += read;
-
if (skipIOCache && bytesSinceCacheFlush >= CACHE_FLUSH_INTERVAL_IN_BYTES)
{
// with random I/O we can't control what we are skipping so
@@ -155,7 +180,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return filePath;
}
- public void reset() throws IOException
+ public void reset()
{
seek(markedPointer);
}
@@ -173,7 +198,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return new BufferedRandomAccessFileMark(markedPointer);
}
- public void reset(FileMark mark) throws IOException
+ public void reset(FileMark mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
seek(((BufferedRandomAccessFileMark) mark).pointer);
@@ -189,14 +214,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
/**
* @return true if there is no more data to read
- * @throws IOException on any I/O error.
*/
- public boolean isEOF() throws IOException
+ public boolean isEOF()
{
return getFilePointer() == length();
}
- public long bytesRemaining() throws IOException
+ public long bytesRemaining()
{
return length() - getFilePointer();
}
@@ -213,14 +237,21 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
}
@Override
- public void close() throws IOException
+ public void close()
{
buffer = null;
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- super.close();
+ try
+ {
+ super.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filePath);
+ }
}
@Override
@@ -243,14 +274,14 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
}
@Override
- public void seek(long newPosition) throws IOException
+ public void seek(long newPosition)
{
if (newPosition < 0)
throw new IllegalArgumentException("new position should not be negative");
if (newPosition > length()) // it is save to call length() in read-only mode
- throw new EOFException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
- newPosition, getPath(), length()));
+ throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+ newPosition, getPath(), length()));
current = newPosition;
@@ -261,10 +292,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
@Override
// -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
- public int read() throws IOException
+ public int read()
{
if (buffer == null)
- throw new ClosedChannelException();
+ throw new AssertionError("Attempted to read from closed RAR");
if (isEOF())
return -1; // required by RandomAccessFile
@@ -278,7 +309,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
}
@Override
- public int read(byte[] buffer) throws IOException
+ public int read(byte[] buffer)
{
return read(buffer, 0, buffer.length);
}
@@ -286,10 +317,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
@Override
// -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
- public int read(byte[] buff, int offset, int length) throws IOException
+ public int read(byte[] buff, int offset, int length)
{
if (buffer == null)
- throw new ClosedChannelException();
+ throw new AssertionError("Attempted to read from closed RAR");
if (length == 0)
return 0;
@@ -315,36 +346,48 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return toCopy;
}
- public ByteBuffer readBytes(int length) throws IOException
+ public ByteBuffer readBytes(int length) throws EOFException
{
assert length >= 0 : "buffer length should not be negative: " + length;
byte[] buff = new byte[length];
- readFully(buff); // reading data buffer
+
+ try
+ {
+ readFully(buff); // reading data buffer
+ }
+ catch (EOFException e)
+ {
+ throw e;
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filePath);
+ }
return ByteBuffer.wrap(buff);
}
@Override
- public long length() throws IOException
+ public long length()
{
return fileLength;
}
@Override
- public void write(int value) throws IOException
+ public void write(int value)
{
throw new UnsupportedOperationException();
}
@Override
- public void write(byte[] buffer) throws IOException
+ public void write(byte[] buffer)
{
throw new UnsupportedOperationException();
}
@Override
- public void write(byte[] buffer, int offset, int length) throws IOException
+ public void write(byte[] buffer, int offset, int length)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 03de78b..1dfc1bc 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.util;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.IOError;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.util.Iterator;
@@ -27,6 +26,7 @@ import java.util.NoSuchElementException;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.utils.Pair;
/**
@@ -164,7 +164,7 @@ public abstract class SegmentedFile
}
catch (IOException e)
{
- throw new IOError(e);
+ throw new FSReadError(e, path);
}
return segment;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 8b78730..a80c687 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.CLibrary;
public class SequentialWriter extends OutputStream
@@ -60,9 +61,16 @@ public class SequentialWriter extends OutputStream
public final DataOutputStream stream;
private MessageDigest digest;
- public SequentialWriter(File file, int bufferSize, boolean skipIOCache) throws IOException
+ public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
{
- out = new RandomAccessFile(file, "rw");
+ try
+ {
+ out = new RandomAccessFile(file, "rw");
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
filePath = file.getAbsolutePath();
@@ -70,22 +78,31 @@ public class SequentialWriter extends OutputStream
this.skipIOCache = skipIOCache;
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
- fd = CLibrary.getfd(out.getFD());
+
+ try
+ {
+ fd = CLibrary.getfd(out.getFD());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e); // shouldn't happen
+ }
+
directoryFD = CLibrary.tryOpenDirectory(file.getParent());
stream = new DataOutputStream(this);
}
- public static SequentialWriter open(File file) throws IOException
+ public static SequentialWriter open(File file)
{
return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
}
- public static SequentialWriter open(File file, boolean skipIOCache) throws IOException
+ public static SequentialWriter open(File file, boolean skipIOCache)
{
return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
}
- public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache) throws IOException
+ public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
{
return new SequentialWriter(file, bufferSize, skipIOCache);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 7966605..1b0ae05 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -24,12 +24,11 @@ import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.xerial.snappy.SnappyInputStream;
-
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
+import org.xerial.snappy.SnappyInputStream;
public class IncomingTcpConnection extends Thread
{
@@ -76,7 +75,7 @@ public class IncomingTcpConnection extends Thread
}
catch (IOException e)
{
- logger.debug("IOError reading from socket; closing", e);
+ logger.debug("IOException reading from socket; closing", e);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index 62316c8..2d51590 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -17,6 +17,9 @@
*/
package org.apache.cassandra.service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
@@ -24,28 +27,19 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
{
private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
+
public void doVerb(MessageIn<SnapshotCommand> message, String id)
{
- try
- {
- SnapshotCommand command = message.payload;
- if (command.clear_snapshot)
- Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
- else
- Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
- if (logger.isDebugEnabled())
- logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
- MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
- }
- catch (Exception ex)
- {
- throw new RuntimeException(ex);
- }
+ SnapshotCommand command = message.payload;
+ if (command.clear_snapshot)
+ Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
+ else
+ Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
+ MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4af399d..9246716 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.File;
-import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
@@ -354,9 +353,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
Thread.sleep(delay);
}
- catch (Exception ex)
+ catch (InterruptedException e)
{
- throw new IOError(ex);
+ throw new AssertionError(e);
}
Schema.instance.updateVersionAndAnnounce();
@@ -703,7 +702,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- public synchronized void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException
+ public synchronized void joinRing() throws IOException, ConfigurationException
{
if (!joined)
{
@@ -1846,34 +1845,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return stringify(Gossiper.instance.getUnreachableMembers());
}
- private static String getCanonicalPath(String filename)
- {
- try
- {
- return new File(filename).getCanonicalPath();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
public String[] getAllDataFileLocations()
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
for (int i = 0; i < locations.length; i++)
- locations[i] = getCanonicalPath(locations[i]);
+ locations[i] = FileUtils.getCanonicalPath(locations[i]);
return locations;
}
public String getCommitLogLocation()
{
- return getCanonicalPath(DatabaseDescriptor.getCommitLogLocation());
+ return FileUtils.getCanonicalPath(DatabaseDescriptor.getCommitLogLocation());
}
public String getSavedCachesLocation()
{
- return getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
+ return FileUtils.getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
}
private List<String> stringify(Iterable<InetAddress> endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 075c24c..43d053e 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
-import com.ning.compress.lzf.LZFInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,14 +34,15 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import com.ning.compress.lzf.LZFInputStream;
public class IncomingStreamReader
{
@@ -82,6 +82,9 @@ public class IncomingStreamReader
}
}
+ /**
+ * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
public void read() throws IOException
{
if (remoteFile != null)
@@ -111,6 +114,9 @@ public class IncomingStreamReader
session.closeIfFinished();
}
+ /**
+ * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
{
ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
@@ -139,7 +145,7 @@ public class IncomingStreamReader
{
// need to update row cache
// Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.desc.baseFilename(), key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
// We don't expire anything so the row shouldn't be empty
assert !row.isEmpty();
@@ -164,7 +170,7 @@ public class IncomingStreamReader
}
return writer.closeAndOpenReader();
}
- catch (Exception e)
+ catch (Throwable e)
{
writer.abort();
if (e instanceof IOException)
@@ -174,7 +180,7 @@ public class IncomingStreamReader
}
}
- private void retry() throws IOException
+ private void retry()
{
/* Ask the source node to re-stream this file. */
session.retry(remoteFile);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e11838c..2c812e5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -27,8 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.MessagingService;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +37,8 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
/** each context gets its own StreamInSession. So there may be >1 Session per host */
public class StreamInSession extends AbstractStreamSession
@@ -142,7 +142,7 @@ public class StreamInSession extends AbstractStreamSession
logger.debug("ack {} sent for {}", reply, remoteFile);
}
- public void retry(PendingFile remoteFile) throws IOException
+ public void retry(PendingFile remoteFile)
{
retries++;
if (retries > DatabaseDescriptor.getMaxStreamingRetries())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index aa36958..2ade0c6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.streaming;
-import java.io.IOError;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.Future;
@@ -90,11 +88,10 @@ public class StreamOut
* Flushes matching column families from the given keyspace, or all columnFamilies
* if the cf list is empty.
*/
- private static void flushSSTables(Iterable<ColumnFamilyStore> stores) throws IOException
+ private static void flushSSTables(Iterable<ColumnFamilyStore> stores)
{
logger.info("Flushing memtables for {}...", stores);
- List<Future<?>> flushes;
- flushes = new ArrayList<Future<?>>();
+ List<Future<?>> flushes = new ArrayList<Future<?>>();
for (ColumnFamilyStore cfstore : stores)
{
Future<?> flush = cfstore.forceFlush();
@@ -110,28 +107,20 @@ public class StreamOut
public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
{
assert ranges.size() > 0;
-
logger.info("Beginning transfer to {}", session.getHost());
logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
- try
- {
- flushSSTables(cfses);
- Iterable<SSTableReader> sstables = Collections.emptyList();
- for (ColumnFamilyStore cfStore : cfses)
- sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
- transferSSTables(session, sstables, ranges, type);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ flushSSTables(cfses);
+ Iterable<SSTableReader> sstables = Collections.emptyList();
+ for (ColumnFamilyStore cfStore : cfses)
+ sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
+ transferSSTables(session, sstables, ranges, type);
}
/**
* Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
* You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
*/
- public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type) throws IOException
+ public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
{
List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index e695df0..504c15d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -17,19 +17,18 @@
*/
package org.apache.cassandra.streaming;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* This class manages the streaming of multiple files one after the other.
@@ -115,7 +114,7 @@ public class StreamOutSession extends AbstractStreamSession
MessagingService.instance().stream(new StreamHeader(table, getSessionId(), pf), getHost());
}
- public void startNext() throws IOException
+ public void startNext()
{
assert files.containsKey(currentFile);
files.get(currentFile).sstable.releaseReference();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 1c5ec4b..714f76a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -17,10 +17,6 @@
*/
package org.apache.cassandra.streaming;
-
-import java.io.IOError;
-import java.io.IOException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,42 +29,35 @@ public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
public void doVerb(MessageIn<StreamReply> message, String id)
{
- try
+ StreamReply reply = message.payload;
+ logger.debug("Received StreamReply {}", reply);
+ StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
+ if (session == null)
{
- StreamReply reply = message.payload;
- logger.debug("Received StreamReply {}", reply);
- StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
- if (session == null)
- {
- logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
- return;
- }
-
- switch (reply.action)
- {
- case FILE_FINISHED:
- logger.info("Successfully sent {} to {}", reply.file, message.from);
- session.validateCurrentFile(reply.file);
- session.startNext();
- break;
- case FILE_RETRY:
- session.validateCurrentFile(reply.file);
- logger.info("Need to re-stream file {} to {}", reply.file, message.from);
- session.retry();
- break;
- case SESSION_FINISHED:
- session.close(true);
- break;
- case SESSION_FAILURE:
- session.close(false);
- break;
- default:
- throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
- }
+ logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
+ return;
}
- catch (IOException ex)
+
+ switch (reply.action)
{
- throw new IOError(ex);
+ case FILE_FINISHED:
+ logger.info("Successfully sent {} to {}", reply.file, message.from);
+ session.validateCurrentFile(reply.file);
+ session.startNext();
+ break;
+ case FILE_RETRY:
+ session.validateCurrentFile(reply.file);
+ logger.info("Need to re-stream file {} to {}", reply.file, message.from);
+ session.retry();
+ break;
+ case SESSION_FINISHED:
+ session.close(true);
+ break;
+ case SESSION_FAILURE:
+ session.close(false);
+ break;
+ default:
+ throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index bff8966..c63d119 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -148,17 +148,9 @@ public class StreamingRepairTask implements Runnable
public void onSuccess()
{
if (outstanding.decrementAndGet() > 0)
- // waiting on more calls
- return;
+ return; // waiting on more calls
- try
- {
- StreamingRepairResponse.reply(taskOwner, taskId);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ StreamingRepairResponse.reply(taskOwner, taskId);
}
public void onFailure() {}
@@ -222,7 +214,7 @@ public class StreamingRepairTask implements Runnable
task.callback.onSuccess();
}
- private static void reply(InetAddress remote, UUID taskid) throws IOException
+ private static void reply(InetAddress remote, UUID taskid)
{
logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
MessageOut<UUID> message = new MessageOut<UUID>(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDGen.serializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 8fbaea1..8eadee8 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -89,7 +89,7 @@ public class CompressedInputStream extends InputStream
return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
}
- public void position(long position) throws IOException
+ public void position(long position)
{
assert position >= current : "stream can only read forward.";
current = position;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index c606ef5..a13440e 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -335,19 +335,12 @@ public class ByteBufferUtil
}
}
- public static void writeWithShortLength(ByteBuffer buffer, DataOutput out)
+ public static void writeWithShortLength(ByteBuffer buffer, DataOutput out) throws IOException
{
int length = buffer.remaining();
assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
- try
- {
- out.writeShort(length);
- write(buffer, out); // writing data bytes to output source
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ out.writeShort(length);
+ write(buffer, out); // writing data bytes to output source
}
public static ByteBuffer readWithLength(DataInput in) throws IOException
@@ -455,7 +448,7 @@ public class ByteBufferUtil
return new InputStream()
{
- public int read() throws IOException
+ public int read()
{
if (!copy.hasRemaining())
return -1;
@@ -464,7 +457,7 @@ public class ByteBufferUtil
}
@Override
- public int read(byte[] bytes, int off, int len) throws IOException
+ public int read(byte[] bytes, int off, int len)
{
if (!copy.hasRemaining())
return -1;
@@ -475,7 +468,7 @@ public class ByteBufferUtil
}
@Override
- public int available() throws IOException
+ public int available()
{
return copy.remaining();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 15e493f..38bf47b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -227,19 +227,6 @@ public class FBUtilities
return messageDigest.digest();
}
- public static void renameWithConfirm(String tmpFilename, String filename) throws IOException
- {
- if (!new File(tmpFilename).renameTo(new File(filename)))
- {
- throw new IOException("rename failed of " + filename);
- }
- }
-
- public static void renameWithOutConfirm(String tmpFilename, String filename) throws IOException
- {
- new File(tmpFilename).renameTo(new File(filename));
- }
-
@Deprecated
public static void serialize(TSerializer serializer, TBase struct, DataOutput out)
throws IOException
@@ -611,7 +598,7 @@ public class FBUtilities
return buffer.getData();
}
- public static RuntimeException unchecked(Exception e)
+ public static RuntimeException unchecked(Throwable e)
{
return e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
}