You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/23 07:49:36 UTC
svn commit: r1138740 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/
test/unit/org/apache/cassandra/io/sstable/
Author: jbellis
Date: Thu Jun 23 05:49:35 2011
New Revision: 1138740
URL: http://svn.apache.org/viewvc?rev=1138740&view=rev
Log:
clean up tmpfiles after failed compaction
patch by Aaron Morton; reviewed by slebresne and Stu Hood for CASSANDRA-2468
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 23 05:49:35 2011
@@ -7,6 +7,7 @@
(CASSANDRA-2062)
* Fixed the ability to set compaction strategy in cli using create column family command (CASSANDRA-2778)
* Add startup flag to renew counter node id (CASSANDRA-2788)
+ * clean up tmp files after failed compaction (CASSANDRA-2468)
0.8.2
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 23 05:49:35 2011
@@ -454,7 +454,14 @@ public class ColumnFamilyStore implement
if (components.contains(Component.COMPACTED_MARKER) || desc.temporary)
{
- SSTable.delete(desc, components);
+ try
+ {
+ SSTable.delete(desc, components);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
continue;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 23 05:49:35 2011
@@ -224,14 +224,22 @@ public class Memtable
+ keySize // keys in data file
+ currentThroughput.get()) // data
* 1.2); // bloom filter and row index overhead
+ SSTableReader ssTable;
+ // errors when creating the writer that may leave empty temp files.
SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize, context);
+ try
+ {
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
+ writer.append(entry.getKey(), entry.getValue());
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
- writer.append(entry.getKey(), entry.getValue());
-
- SSTableReader ssTable = writer.closeAndOpenReader();
+ ssTable = writer.closeAndOpenReader();
+ }
+ finally
+ {
+ writer.cleanupIfNecessary();
+ }
logger.info(String.format("Completed flushing %s (%d bytes)",
ssTable.getFilename(), new File(ssTable.getFilename()).length()));
return ssTable;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jun 23 05:49:35 2011
@@ -469,131 +469,142 @@ public class CompactionManager implement
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
+ // errors when creating the writer may leave empty temp files.
SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+ SSTableReader newSstable = null;
executor.beginCompaction(new ScrubInfo(dataFile, sstable));
int goodRows = 0, badRows = 0, emptyRows = 0;
- while (!dataFile.isEOF())
+ try
{
- long rowStart = dataFile.getFilePointer();
- if (logger.isDebugEnabled())
- logger.debug("Reading row at " + rowStart);
-
- DecoratedKey key = null;
- long dataSize = -1;
- try
+ while (!dataFile.isEOF())
{
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
- dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+ long rowStart = dataFile.getFilePointer();
if (logger.isDebugEnabled())
- logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- // check for null key below
- }
-
- ByteBuffer currentIndexKey = nextIndexKey;
- long nextRowPositionFromIndex;
- try
- {
- nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
- }
- catch (Throwable th)
- {
- logger.warn("Error reading index file", th);
- nextIndexKey = null;
- nextRowPositionFromIndex = dataFile.length();
- }
-
- long dataStart = dataFile.getFilePointer();
- long dataStartFromIndex = currentIndexKey == null
- ? -1
- : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
- long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
- assert currentIndexKey != null || indexFile.isEOF();
- if (logger.isDebugEnabled() && currentIndexKey != null)
- logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
+ logger.debug("Reading row at " + rowStart);
- writer.mark();
- try
- {
- if (key == null)
- throw new IOError(new IOException("Unable to read row key from data file"));
- if (dataSize > dataFile.length())
- throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
{
- emptyRows++;
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
}
- else
+ catch (Throwable th)
{
- writer.append(compactedRow);
- goodRows++;
+ throwIfFatal(th);
+ // check for null key below
}
- if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
- logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- logger.warn("Non-fatal error reading row (stacktrace follows)", th);
- writer.reset();
- if (currentIndexKey != null
- && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ ByteBuffer currentIndexKey = nextIndexKey;
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+ }
+ catch (Throwable th)
{
- logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
- dataSizeFromIndex, dataStartFromIndex));
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
- try
+ logger.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
+
+ long dataStart = dataFile.getFilePointer();
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
+ : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+ long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (logger.isDebugEnabled() && currentIndexKey != null)
+ logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
+
+ writer.mark();
+ try
+ {
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row size " + dataSize));
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ writer.append(compactedRow);
+ goodRows++;
+ }
+ if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+ logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ logger.warn("Non-fatal error reading row (stacktrace follows)", th);
+ writer.reset();
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
+ logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
+ dataSizeFromIndex, dataStartFromIndex));
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+ try
{
- emptyRows++;
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ writer.append(compactedRow);
+ goodRows++;
+ }
}
- else
+ catch (Throwable th2)
{
- writer.append(compactedRow);
- goodRows++;
+ throwIfFatal(th2);
+ // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th2);
+
+ logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
+ writer.reset();
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
}
}
- catch (Throwable th2)
+ else
{
- throwIfFatal(th2);
// Skipping rows is dangerous for counters (see CASSANDRA-2759)
if (isCommutative)
- throw new IOError(th2);
+ throw new IOError(th);
- logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
- writer.reset();
- dataFile.seek(nextRowPositionFromIndex);
+ logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
badRows++;
}
}
- else
- {
- // Skipping rows is dangerous for counters (see CASSANDRA-2759)
- if (isCommutative)
- throw new IOError(th);
-
- logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
- if (currentIndexKey != null)
- dataFile.seek(nextRowPositionFromIndex);
- badRows++;
- }
}
+
+ if (writer.getFilePointer() > 0)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ }
+ finally
+ {
+ writer.cleanupIfNecessary();
}
- if (writer.getFilePointer() > 0)
+ if (newSstable != null)
{
- SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
if (badRows > 0)
@@ -652,6 +663,7 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer = null;
+ SSTableReader newSstable = null;
logger.info("Cleaning up " + sstable);
// Calculate the expected compacted filesize
@@ -691,17 +703,21 @@ public class CompactionManager implement
}
}
}
+ if (writer != null)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
finally
{
scanner.close();
executor.finishCompaction(ci);
+ if (writer != null)
+ writer.cleanupIfNecessary();
+ executor.finishCompaction(ci);
}
List<SSTableReader> results = new ArrayList<SSTableReader>();
- if (writer != null)
+ if (newSstable != null)
{
- SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
results.add(newSstable);
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Jun 23 05:49:35 2011
@@ -127,7 +127,8 @@ public class CompactionTask extends Abst
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- SSTableWriter writer;
+ SSTableWriter writer = null;
+ final SSTableReader ssTable;
CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -164,15 +165,17 @@ public class CompactionTask extends Abst
}
}
}
+ ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
}
finally
{
ci.close();
if (collector != null)
collector.finishCompaction(ci);
+ if (writer != null)
+ writer.cleanupIfNecessary();
}
- SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
ssTable.cacheKey(entry.getKey(), entry.getValue());
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Thu Jun 23 05:49:35 2011
@@ -55,6 +55,21 @@ public class Descriptor
public final boolean isLatestVersion;
public final boolean usesOldBloomFilter;
+ public enum TempState
+ {
+ LIVE,
+ TEMP,
+ ANY;
+
+ boolean isMatch(Descriptor descriptor)
+ {
+ assert descriptor != null;
+ if (TempState.ANY == this)
+ return true;
+ return (TempState.TEMP == this) ? descriptor.temporary : !descriptor.temporary;
+ }
+ }
+
/**
* A descriptor that assumes CURRENT_VERSION.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu Jun 23 05:49:35 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.FilenameFilter;
-import java.io.IOError;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -137,26 +136,20 @@ public abstract class SSTable
*
* @return true if the file was deleted
*/
- public static boolean delete(Descriptor desc, Set<Component> components)
+ public static boolean delete(Descriptor desc, Set<Component> components) throws IOException
{
- try
- {
- // remove the DATA component first if it exists
- if (components.contains(Component.DATA))
- FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
- for (Component component : components)
- {
- if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
- continue;
- FileUtils.deleteWithConfirm(desc.filenameFor(component));
- }
- // remove the COMPACTED_MARKER component last if it exists
- FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
- }
- catch (IOException e)
+ // remove the DATA component first if it exists
+ if (components.contains(Component.DATA))
+ FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
+ for (Component component : components)
{
- throw new IOError(e);
+ if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
+ continue;
+ FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
+ // remove the COMPACTED_MARKER component last if it exists
+ FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
+
logger.debug("Deleted {}", desc);
return true;
}
@@ -196,7 +189,7 @@ public abstract class SSTable
/**
* Discovers existing components for the descriptor. Slow: only intended for use outside the critical path.
*/
- static Set<Component> componentsFor(final Descriptor desc, final boolean liveOnly)
+ static Set<Component> componentsFor(final Descriptor desc, final Descriptor.TempState matchState)
{
final Set<Component> components = new HashSet<Component>();
desc.directory.list(new FilenameFilter()
@@ -204,7 +197,7 @@ public abstract class SSTable
public boolean accept(File dir, String name)
{
Pair<Descriptor,Component> component = tryComponentFromFilename(dir, name);
- if (component != null && component.left.equals(desc) && (!liveOnly || !component.left.temporary))
+ if (component != null && component.left.equals(desc) && (matchState.isMatch(component.left)))
components.add(component.right);
return false;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java Thu Jun 23 05:49:35 2011
@@ -20,6 +20,7 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
+import java.io.IOError;
import java.io.IOException;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
@@ -94,7 +95,15 @@ public class SSTableDeletingReference ex
}
}
// let the remainder be cleaned up by delete
- SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+ try
+ {
+ SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+
tracker.spaceReclaimed(size);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jun 23 05:49:35 2011
@@ -138,7 +138,7 @@ public class SSTableReader extends SSTab
public static SSTableReader open(Descriptor desc) throws IOException
{
- Set<Component> components = componentsFor(desc, false);
+ Set<Component> components = componentsFor(desc, Descriptor.TempState.ANY);
return open(desc, components, DatabaseDescriptor.getCFMetaData(desc.ksname, desc.cfname), StorageService.getPartitioner());
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Jun 23 05:49:35 2011
@@ -168,6 +168,26 @@ public class SSTableWriter extends SSTab
afterAppend(decoratedKey, currentPosition);
}
+ /**
+ * Attempt to close the index writer and data file before deleting all temp components for the sstable
+ */
+ public void cleanupIfNecessary()
+ {
+ FileUtils.closeQuietly(iwriter);
+ FileUtils.closeQuietly(dataFile);
+
+ try
+ {
+ Set<Component> components = SSTable.componentsFor(descriptor, Descriptor.TempState.TEMP);
+ if (!components.isEmpty())
+ SSTable.delete(descriptor, components);
+ }
+ catch (Exception e)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+ }
+ }
+
public SSTableReader closeAndOpenReader() throws IOException
{
return closeAndOpenReader(System.currentTimeMillis());
@@ -300,26 +320,53 @@ public class SSTableWriter extends SSTab
public SSTableReader build() throws IOException
{
- if (cfs.isInvalid())
- return null;
- maybeOpenIndexer();
+ try
+ {
+ if (cfs.isInvalid())
+ return null;
+ maybeOpenIndexer();
+
+ File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ assert !ifile.exists();
+ assert !ffile.exists();
- File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
- File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
- assert !ifile.exists();
- assert !ffile.exists();
+ long estimatedRows = indexer.prepareIndexing();
- long estimatedRows = indexer.prepareIndexing();
+ // build the index and filter
+ long rows = indexer.index();
+
+ logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+ return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, Descriptor.TempState.ANY)));
+ }
+ finally
+ {
+ cleanupIfNecessary();
+ }
+ }
- // build the index and filter
- long rows = indexer.index();
+ /**
+ * Attempt to close the index writer before deleting all temp components for the sstable
+ */
+ public void cleanupIfNecessary()
+ {
+ FileUtils.closeQuietly(indexer);
- logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
- return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
+ try
+ {
+ Set<Component> components = SSTable.componentsFor(desc, Descriptor.TempState.TEMP);
+ if (!components.isEmpty())
+ SSTable.delete(desc, components);
+ }
+ catch (Exception e)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", desc), e);
+ }
}
+
}
- static class RowIndexer
+ static class RowIndexer implements Closeable
{
protected final Descriptor desc;
public final BufferedRandomAccessFile dfile;
@@ -376,7 +423,7 @@ public class SSTableWriter extends SSTab
}
}
- void close() throws IOException
+ public void close() throws IOException
{
dfile.close();
iwriter.close();
@@ -465,6 +512,11 @@ public class SSTableWriter extends SSTab
writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
return rows;
}
+
+ public String toString()
+ {
+ return "RowIndexer(" + desc + ")";
+ }
}
/*
@@ -533,7 +585,7 @@ public class SSTableWriter extends SSTab
}
@Override
- void close() throws IOException
+ public void close() throws IOException
{
super.close();
writerDfile.close();
@@ -543,7 +595,7 @@ public class SSTableWriter extends SSTab
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
- static class IndexWriter
+ static class IndexWriter implements Closeable
{
private final BufferedRandomAccessFile indexFile;
public final Descriptor desc;
@@ -610,5 +662,10 @@ public class SSTableWriter extends SSTab
// we assume that if that worked then we won't be trying to reset.
indexFile.reset(mark);
}
+
+ public String toString()
+ {
+ return "IndexWriter(" + desc + ")";
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java Thu Jun 23 05:49:35 2011
@@ -92,7 +92,7 @@ public class FileUtils
}
catch (Exception e)
{
- logger_.warn("Failed closing stream", e);
+ logger_.warn("Failed closing " + c, e);
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java Thu Jun 23 05:49:35 2011
@@ -75,9 +75,9 @@ public class SSTableTest extends Cleanup
ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
verifyMany(ssTable, map);
- Set<Component> live = SSTable.componentsFor(ssTable.descriptor, true);
+ Set<Component> live = SSTable.componentsFor(ssTable.descriptor, Descriptor.TempState.LIVE);
assert !live.isEmpty() : "SSTable has live components";
- Set<Component> all = SSTable.componentsFor(ssTable.descriptor, false);
+ Set<Component> all = SSTable.componentsFor(ssTable.descriptor, Descriptor.TempState.ANY);
assert live.equals(all) : "live components same as all components";
all.removeAll(live);
assert all.isEmpty() : "SSTable has no temp components";