You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/23 05:06:40 UTC

svn commit: r828921 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilySerializer.java db/ColumnIndexer.java db/ColumnSerializer.java db/SuperColumn.java io/CompactionIterator.java io/IteratingRow.java

Author: jbellis
Date: Fri Oct 23 03:06:39 2009
New Revision: 828921

URL: http://svn.apache.org/viewvc?rev=828921&view=rev
Log:
make compaction able to recover from errors w/in a row
patch by jbellis; reviewed by junrao for CASSANDRA-466

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Oct 23 03:06:39 2009
@@ -51,29 +51,43 @@
      * <column count>
      * <columns, serialized individually>
     */
-    public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    public void serialize(ColumnFamily columnFamily, DataOutput dos)
     {
-        dos.writeUTF(columnFamily.name());
-        dos.writeUTF(columnFamily.type_);
-        dos.writeUTF(columnFamily.getComparatorName());
-        dos.writeUTF(columnFamily.getSubComparatorName());
+        try
+        {
+            dos.writeUTF(columnFamily.name());
+            dos.writeUTF(columnFamily.type_);
+            dos.writeUTF(columnFamily.getComparatorName());
+            dos.writeUTF(columnFamily.getSubComparatorName());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
         serializeForSSTable(columnFamily, dos);
     }
 
-    public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
     {
-        dos.writeInt(columnFamily.localDeletionTime);
-        dos.writeLong(columnFamily.markedForDeleteAt);
+        try
+        {
+            dos.writeInt(columnFamily.localDeletionTime);
+            dos.writeLong(columnFamily.markedForDeleteAt);
 
-        Collection<IColumn> columns = columnFamily.getSortedColumns();
-        dos.writeInt(columns.size());
-        for ( IColumn column : columns )
+            Collection<IColumn> columns = columnFamily.getSortedColumns();
+            dos.writeInt(columns.size());
+            for (IColumn column : columns)
+            {
+                columnFamily.getColumnSerializer().serialize(column, dos);
+            }
+        }
+        catch (IOException e)
         {
-            columnFamily.getColumnSerializer().serialize(column, dos);
+            throw new RuntimeException(e);
         }
     }
 
-    public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
     {
         ColumnIndexer.serialize(columnFamily, dos);
         serializeForSSTable(columnFamily, dos);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Fri Oct 23 03:06:39 2009
@@ -46,20 +46,27 @@
 	 * @param dos data output stream
 	 * @throws IOException
 	 */
-    public static void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    public static void serialize(ColumnFamily columnFamily, DataOutput dos)
 	{
         Collection<IColumn> columns = columnFamily.getSortedColumns();
         BloomFilter bf = createColumnBloomFilter(columns);                    
         /* Write out the bloom filter. */
-        DataOutputBuffer bufOut = new DataOutputBuffer(); 
-        BloomFilter.serializer().serialize(bf, bufOut);
-        /* write the length of the serialized bloom filter. */
-        dos.writeInt(bufOut.getLength());
-        /* write out the serialized bytes. */
-        dos.write(bufOut.getData(), 0, bufOut.getLength());
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        try
+        {
+            BloomFilter.serializer().serialize(bf, bufOut);
+            /* write the length of the serialized bloom filter. */
+            dos.writeInt(bufOut.getLength());
+            /* write out the serialized bytes. */
+            dos.write(bufOut.getData(), 0, bufOut.getLength());
 
-        /* Do the indexing */
-        doIndexing(columnFamily.getComparator(), columns, dos);
+            /* Do the indexing */
+            doIndexing(columnFamily.getComparator(), columns, dos);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
 	}
     
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Oct 23 03:06:39 2009
@@ -23,19 +23,25 @@
 
 import java.io.*;
 
-import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class ColumnSerializer implements ICompactSerializer2<IColumn>
 {
-    public static void writeName(byte[] name, DataOutput out) throws IOException
+    public static void writeName(byte[] name, DataOutput out)
     {
         int length = name.length;
         assert length <= IColumn.MAX_NAME_LENGTH;
-        out.writeByte((length >> 8) & 0xFF);
-        out.writeByte(length & 0xFF);
-        out.write(name);
+        try
+        {
+            out.writeByte((length >> 8) & 0xFF);
+            out.writeByte(length & 0xFF);
+            out.write(name);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public static byte[] readName(DataInput in) throws IOException
@@ -48,12 +54,19 @@
         return bytes;
     }
 
-    public void serialize(IColumn column, DataOutput dos) throws IOException
+    public void serialize(IColumn column, DataOutput dos)
     {
         ColumnSerializer.writeName(column.name(), dos);
-        dos.writeBoolean(column.isMarkedForDelete());
-        dos.writeLong(column.timestamp());
-        FBUtilities.writeByteArray(column.value(), dos);
+        try
+        {
+            dos.writeBoolean(column.isMarkedForDelete());
+            dos.writeLong(column.timestamp());
+            FBUtilities.writeByteArray(column.value(), dos);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public Column deserialize(DataInput dis) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Oct 23 03:06:39 2009
@@ -324,19 +324,25 @@
         return comparator;
     }
 
-    public void serialize(IColumn column, DataOutput dos) throws IOException
+    public void serialize(IColumn column, DataOutput dos)
     {
     	SuperColumn superColumn = (SuperColumn)column;
         ColumnSerializer.writeName(column.name(), dos);
-        dos.writeInt(superColumn.getLocalDeletionTime());
-        dos.writeLong(superColumn.getMarkedForDeleteAt());
-
-        Collection<IColumn> columns  = column.getSubColumns();
-        dos.writeInt(columns.size());
+        try
+        {
+            dos.writeInt(superColumn.getLocalDeletionTime());
+            dos.writeLong(superColumn.getMarkedForDeleteAt());
 
-        for ( IColumn subColumn : columns )
+            Collection<IColumn> columns = column.getSubColumns();
+            dos.writeInt(columns.size());
+            for (IColumn subColumn : columns)
+            {
+                Column.serializer().serialize(subColumn, dos);
+            }
+        }
+        catch (IOException e)
         {
-            Column.serializer().serialize(subColumn, dos);
+            throw new RuntimeException(e);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Fri Oct 23 03:06:39 2009
@@ -6,6 +6,7 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.collections.iterators.CollatingIterator;
 
 import org.apache.cassandra.utils.ReducingIterator;
@@ -15,6 +16,8 @@
 
 public class CompactionIterator extends ReducingIterator<IteratingRow, CompactionIterator.CompactedRow> implements Closeable
 {
+    private static Logger logger = Logger.getLogger(CompactionIterator.class);
+
     private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
     private final int gcBefore;
 
@@ -56,32 +59,30 @@
 
     protected CompactedRow getReduced()
     {
-        try
-        {
-            return getReducedRaw();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    protected CompactedRow getReducedRaw() throws IOException
-    {
+        assert rows.size() > 0;
         DataOutputBuffer buffer = new DataOutputBuffer();
         DecoratedKey key = rows.get(0).getKey();
-        assert rows.size() > 0;
 
         ColumnFamily cf = null;
         for (IteratingRow row : rows)
         {
+            ColumnFamily thisCF;
+            try
+            {
+                thisCF = row.getColumnFamily();
+            }
+            catch (IOException e)
+            {
+                logger.error("Skipping row " + key + " in " + row.getPath(), e);
+                continue;
+            }
             if (cf == null)
             {
-                cf = row.getColumnFamily();
+                cf = thisCF;
             }
             else
             {
-                cf.addAll(row.getColumnFamily());
+                cf.addAll(thisCF);
             }
         }
         rows.clear();
@@ -90,6 +91,7 @@
         if (cfPurged == null)
             return null;
         ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
+
         return new CompactedRow(key, buffer);
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=828921&r1=828920&r2=828921&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Fri Oct 23 03:06:39 2009
@@ -56,6 +56,11 @@
         return key;
     }
 
+    public String getPath()
+    {
+        return file.getPath();
+    }
+
     public void echoData(DataOutput out) throws IOException
     {
         file.seek(dataStart);