You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/08/04 16:36:05 UTC
svn commit: r982265 - in /cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilySerializer.java UnserializableColumnFamilyException.java
commitlog/CommitLog.java
Author: gdusbabek
Date: Wed Aug 4 14:36:05 2010
New Revision: 982265
URL: http://svn.apache.org/viewvc?rev=982265&view=rev
Log:
skip CL rows that have unrecoverable row mutations. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1353
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=982265&r1=982264&r2=982265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Aug 4 14:36:05 2010
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.DataInput;
import java.util.Collection;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -109,7 +110,10 @@ public class ColumnFamilySerializer impl
return null;
// create a ColumnFamily based on the cf id
- ColumnFamily cf = ColumnFamily.create(dis.readInt());
+ int cfId = dis.readInt();
+ if (CFMetaData.getCF(cfId) == null)
+ throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
+ ColumnFamily cf = ColumnFamily.create(cfId);
deserializeFromSSTableNoColumns(cf, dis);
deserializeColumns(dis, cf);
return cf;
Added: cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java?rev=982265&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java Wed Aug 4 14:36:05 2010
@@ -0,0 +1,33 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class UnserializableColumnFamilyException extends IOException
+{
+ public final int cfId;
+
+ public UnserializableColumnFamilyException(String msg, int cfId)
+ {
+ super(msg);
+ this.cfId = cfId;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=982265&r1=982264&r2=982265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Aug 4 14:36:05 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnserializableColumnFamilyException;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
@@ -184,6 +185,7 @@ public class CommitLog
Set<Table> tablesRecovered = new HashSet<Table>();
List<Future<?>> futures = new ArrayList<Future<?>>();
byte[] bytes = new byte[4096];
+ Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
for (File file : clogs)
{
@@ -255,7 +257,24 @@ public class CommitLog
/* deserialize the commit log entry */
ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
- final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ RowMutation rm = null;
+ try
+ {
+ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ }
+ catch (UnserializableColumnFamilyException ex)
+ {
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ continue;
+ }
+
if (logger.isDebugEnabled())
logger.debug(String.format("replaying mutation for %s.%s: %s",
rm.getTable(),
@@ -266,11 +285,12 @@ public class CommitLog
final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
final long entryLocation = reader.getFilePointer();
final CommitLogHeader finalHeader = clHeader;
+ final RowMutation frm = rm;
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
- RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
+ RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
// Rebuild the row mutation, omitting column families that a) have already been flushed,
// b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
@@ -304,6 +324,9 @@ public class CommitLog
logger.info("Finished reading " + file);
}
}
+
+ for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
+ logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
// wait for all the writes to finish on the mutation stage
FBUtilities.waitOnFutures(futures);