You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/08/04 16:36:05 UTC

svn commit: r982265 - in /cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilySerializer.java UnserializableColumnFamilyException.java commitlog/CommitLog.java

Author: gdusbabek
Date: Wed Aug  4 14:36:05 2010
New Revision: 982265

URL: http://svn.apache.org/viewvc?rev=982265&view=rev
Log:
skip CL rows that have unrecoverable row mutations. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1353

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=982265&r1=982264&r2=982265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Aug  4 14:36:05 2010
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.util.Collection;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -109,7 +110,10 @@ public class ColumnFamilySerializer impl
             return null;
 
         // create a ColumnFamily based on the cf id
-        ColumnFamily cf = ColumnFamily.create(dis.readInt());
+        int cfId = dis.readInt();
+        if (CFMetaData.getCF(cfId) == null)
+            throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
+        ColumnFamily cf = ColumnFamily.create(cfId);
         deserializeFromSSTableNoColumns(cf, dis);
         deserializeColumns(dis, cf);
         return cf;

Added: cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java?rev=982265&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java Wed Aug  4 14:36:05 2010
@@ -0,0 +1,33 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class UnserializableColumnFamilyException extends IOException
+{
+    public final int cfId;
+    
+    public UnserializableColumnFamilyException(String msg, int cfId)
+    {
+        super(msg);
+        this.cfId = cfId;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=982265&r1=982264&r2=982265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Aug  4 14:36:05 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnserializableColumnFamilyException;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
@@ -184,6 +185,7 @@ public class CommitLog
         Set<Table> tablesRecovered = new HashSet<Table>();
         List<Future<?>> futures = new ArrayList<Future<?>>();
         byte[] bytes = new byte[4096];
+        Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
 
         for (File file : clogs)
         {
@@ -255,7 +257,24 @@ public class CommitLog
 
                     /* deserialize the commit log entry */
                     ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
-                    final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                    RowMutation rm = null;
+                    try
+                    {
+                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                    }
+                    catch (UnserializableColumnFamilyException ex)
+                    {
+                        AtomicInteger i = invalidMutations.get(ex.cfId);
+                        if (i == null)
+                        {
+                            i = new AtomicInteger(1);
+                            invalidMutations.put(ex.cfId, i);
+                        }
+                        else
+                            i.incrementAndGet();
+                        continue;
+                    }
+                    
                     if (logger.isDebugEnabled())
                         logger.debug(String.format("replaying mutation for %s.%s: %s",
                                                     rm.getTable(),
@@ -266,11 +285,12 @@ public class CommitLog
                     final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
                     final long entryLocation = reader.getFilePointer();
                     final CommitLogHeader finalHeader = clHeader;
+                    final RowMutation frm = rm;
                     Runnable runnable = new WrappedRunnable()
                     {
                         public void runMayThrow() throws IOException
                         {
-                            RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
+                            RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
 
                             // Rebuild the row mutation, omitting column families that a) have already been flushed,
                             // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
@@ -304,6 +324,9 @@ public class CommitLog
                 logger.info("Finished reading " + file);
             }
         }
+        
+        for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
+            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
 
         // wait for all the writes to finish on the mutation stage
         FBUtilities.waitOnFutures(futures);