You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/23 05:06:40 UTC
svn commit: r828921 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamilySerializer.java db/ColumnIndexer.java
db/ColumnSerializer.java db/SuperColumn.java io/CompactionIterator.java
io/IteratingRow.java
Author: jbellis
Date: Fri Oct 23 03:06:39 2009
New Revision: 828921
URL: http://svn.apache.org/viewvc?rev=828921&view=rev
Log:
make compaction able to recover from errors w/in a row
patch by jbellis; reviewed by junrao for CASSANDRA-466
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Oct 23 03:06:39 2009
@@ -51,29 +51,43 @@
* <column count>
* <columns, serialized individually>
*/
- public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ public void serialize(ColumnFamily columnFamily, DataOutput dos)
{
- dos.writeUTF(columnFamily.name());
- dos.writeUTF(columnFamily.type_);
- dos.writeUTF(columnFamily.getComparatorName());
- dos.writeUTF(columnFamily.getSubComparatorName());
+ try
+ {
+ dos.writeUTF(columnFamily.name());
+ dos.writeUTF(columnFamily.type_);
+ dos.writeUTF(columnFamily.getComparatorName());
+ dos.writeUTF(columnFamily.getSubComparatorName());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
serializeForSSTable(columnFamily, dos);
}
- public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
{
- dos.writeInt(columnFamily.localDeletionTime);
- dos.writeLong(columnFamily.markedForDeleteAt);
+ try
+ {
+ dos.writeInt(columnFamily.localDeletionTime);
+ dos.writeLong(columnFamily.markedForDeleteAt);
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- dos.writeInt(columns.size());
- for ( IColumn column : columns )
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
+ dos.writeInt(columns.size());
+ for (IColumn column : columns)
+ {
+ columnFamily.getColumnSerializer().serialize(column, dos);
+ }
+ }
+ catch (IOException e)
{
- columnFamily.getColumnSerializer().serialize(column, dos);
+ throw new RuntimeException(e);
}
}
- public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
{
ColumnIndexer.serialize(columnFamily, dos);
serializeForSSTable(columnFamily, dos);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Fri Oct 23 03:06:39 2009
@@ -46,20 +46,27 @@
* @param dos data output stream
* @throws IOException
*/
- public static void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
+ public static void serialize(ColumnFamily columnFamily, DataOutput dos)
{
Collection<IColumn> columns = columnFamily.getSortedColumns();
BloomFilter bf = createColumnBloomFilter(columns);
/* Write out the bloom filter. */
- DataOutputBuffer bufOut = new DataOutputBuffer();
- BloomFilter.serializer().serialize(bf, bufOut);
- /* write the length of the serialized bloom filter. */
- dos.writeInt(bufOut.getLength());
- /* write out the serialized bytes. */
- dos.write(bufOut.getData(), 0, bufOut.getLength());
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ try
+ {
+ BloomFilter.serializer().serialize(bf, bufOut);
+ /* write the length of the serialized bloom filter. */
+ dos.writeInt(bufOut.getLength());
+ /* write out the serialized bytes. */
+ dos.write(bufOut.getData(), 0, bufOut.getLength());
- /* Do the indexing */
- doIndexing(columnFamily.getComparator(), columns, dos);
+ /* Do the indexing */
+ doIndexing(columnFamily.getComparator(), columns, dos);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/**
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Oct 23 03:06:39 2009
@@ -23,19 +23,25 @@
import java.io.*;
-import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.utils.FBUtilities;
public class ColumnSerializer implements ICompactSerializer2<IColumn>
{
- public static void writeName(byte[] name, DataOutput out) throws IOException
+ public static void writeName(byte[] name, DataOutput out)
{
int length = name.length;
assert length <= IColumn.MAX_NAME_LENGTH;
- out.writeByte((length >> 8) & 0xFF);
- out.writeByte(length & 0xFF);
- out.write(name);
+ try
+ {
+ out.writeByte((length >> 8) & 0xFF);
+ out.writeByte(length & 0xFF);
+ out.write(name);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public static byte[] readName(DataInput in) throws IOException
@@ -48,12 +54,19 @@
return bytes;
}
- public void serialize(IColumn column, DataOutput dos) throws IOException
+ public void serialize(IColumn column, DataOutput dos)
{
ColumnSerializer.writeName(column.name(), dos);
- dos.writeBoolean(column.isMarkedForDelete());
- dos.writeLong(column.timestamp());
- FBUtilities.writeByteArray(column.value(), dos);
+ try
+ {
+ dos.writeBoolean(column.isMarkedForDelete());
+ dos.writeLong(column.timestamp());
+ FBUtilities.writeByteArray(column.value(), dos);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public Column deserialize(DataInput dis) throws IOException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Oct 23 03:06:39 2009
@@ -324,19 +324,25 @@
return comparator;
}
- public void serialize(IColumn column, DataOutput dos) throws IOException
+ public void serialize(IColumn column, DataOutput dos)
{
SuperColumn superColumn = (SuperColumn)column;
ColumnSerializer.writeName(column.name(), dos);
- dos.writeInt(superColumn.getLocalDeletionTime());
- dos.writeLong(superColumn.getMarkedForDeleteAt());
-
- Collection<IColumn> columns = column.getSubColumns();
- dos.writeInt(columns.size());
+ try
+ {
+ dos.writeInt(superColumn.getLocalDeletionTime());
+ dos.writeLong(superColumn.getMarkedForDeleteAt());
- for ( IColumn subColumn : columns )
+ Collection<IColumn> columns = column.getSubColumns();
+ dos.writeInt(columns.size());
+ for (IColumn subColumn : columns)
+ {
+ Column.serializer().serialize(subColumn, dos);
+ }
+ }
+ catch (IOException e)
{
- Column.serializer().serialize(subColumn, dos);
+ throw new RuntimeException(e);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Fri Oct 23 03:06:39 2009
@@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.Comparator;
+import org.apache.log4j.Logger;
import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.cassandra.utils.ReducingIterator;
@@ -15,6 +16,8 @@
public class CompactionIterator extends ReducingIterator<IteratingRow, CompactionIterator.CompactedRow> implements Closeable
{
+ private static Logger logger = Logger.getLogger(CompactionIterator.class);
+
private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
private final int gcBefore;
@@ -56,32 +59,30 @@
protected CompactedRow getReduced()
{
- try
- {
- return getReducedRaw();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- protected CompactedRow getReducedRaw() throws IOException
- {
+ assert rows.size() > 0;
DataOutputBuffer buffer = new DataOutputBuffer();
DecoratedKey key = rows.get(0).getKey();
- assert rows.size() > 0;
ColumnFamily cf = null;
for (IteratingRow row : rows)
{
+ ColumnFamily thisCF;
+ try
+ {
+ thisCF = row.getColumnFamily();
+ }
+ catch (IOException e)
+ {
+ logger.error("Skipping row " + key + " in " + row.getPath(), e);
+ continue;
+ }
if (cf == null)
{
- cf = row.getColumnFamily();
+ cf = thisCF;
}
else
{
- cf.addAll(row.getColumnFamily());
+ cf.addAll(thisCF);
}
}
rows.clear();
@@ -90,6 +91,7 @@
if (cfPurged == null)
return null;
ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
+
return new CompactedRow(key, buffer);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Fri Oct 23 03:06:39 2009
@@ -56,6 +56,11 @@
return key;
}
+ public String getPath()
+ {
+ return file.getPath();
+ }
+
public void echoData(DataOutput out) throws IOException
{
file.seek(dataStart);