You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/18 23:38:28 UTC

[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
	src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2681e93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2681e93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2681e93

Branch: refs/heads/cassandra-2.1
Commit: c2681e9336a92fc4e562f2b316b6a62cb5c62e98
Parents: add7356 ad3a4f8
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 19 01:37:58 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 19 01:37:58 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                      |  1 +
 .../db/commitlog/CommitLogDescriptor.java        |  6 +++++-
 .../db/commitlog/CommitLogReplayer.java          | 19 +++++++++++--------
 3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index df28b8e,56cd4ff..3bc0f87
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -227,201 -179,187 +227,204 @@@ public class CommitLogReplaye
      public void recover(File file) throws IOException
      {
          final ReplayFilter replayFilter = ReplayFilter.create();
 +        logger.info("Replaying {}", file.getPath());
          CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 -        final long segment = desc.id;
 +        final long segmentId = desc.id;
-         int version = desc.getMessagingVersion();
+         logger.info("Replaying {} (CL version {}, messaging version {})",
+                     file.getPath(),
+                     desc.getVersion(),
+                     desc.getMessagingVersion());
          RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
 +
          try
          {
              assert reader.length() <= Integer.MAX_VALUE;
-             int offset = getStartOffset(segmentId, version);
 -            int replayPosition;
 -            if (globalPosition.segment < segment)
 -            {
 -                replayPosition = 0;
 -            }
 -            else if (globalPosition.segment == segment)
 -            {
 -                replayPosition = globalPosition.position;
 -            }
 -            else
++            int offset = getStartOffset(segmentId, desc.getMessagingVersion());
 +            if (offset < 0)
              {
                  logger.debug("skipping replay of fully-flushed {}", file);
                  return;
              }
  
 -            if (logger.isDebugEnabled())
 -                logger.debug("Replaying " + file + " starting at " + replayPosition);
 -            reader.seek(replayPosition);
 -
 -            /* read the logs populate RowMutation and apply */
 -            while (!reader.isEOF())
 +            int prevEnd = 0;
 +            main: while (true)
              {
 -                if (logger.isDebugEnabled())
 -                    logger.debug("Reading mutation at " + reader.getFilePointer());
  
 -                long claimedCRC32;
 -                int serializedSize;
 -                try
 +                int end = prevEnd;
-                 if (version < CommitLogDescriptor.VERSION_21)
++                if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
 +                    end = Integer.MAX_VALUE;
 +                else
                  {
 -                    // any of the reads may hit EOF
 -                    serializedSize = reader.readInt();
 -                    if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
 -                    {
 -                        logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
 -                        break;
 -                    }
 -
 -                    // RowMutation must be at LEAST 10 bytes:
 -                    // 3 each for a non-empty Keyspace and Key (including the
 -                    // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
 -                    // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 -                    if (serializedSize < 10)
 -                        break;
 -
 -                    long claimedSizeChecksum = reader.readLong();
 -                    checksum.reset();
 -                    if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
 -                        checksum.update(serializedSize);
 -                    else
 -                        FBUtilities.updateChecksumInt(checksum, serializedSize);
 -
 -                    if (checksum.getValue() != claimedSizeChecksum)
 -                        break; // entry wasn't synced correctly/fully. that's
 -                               // ok.
 -
 -                    if (serializedSize > buffer.length)
 -                        buffer = new byte[(int) (1.2 * serializedSize)];
 -                    reader.readFully(buffer, 0, serializedSize);
 -                    claimedCRC32 = reader.readLong();
 -                }
 -                catch (EOFException eof)
 -                {
 -                    break; // last CL entry didn't get completely written. that's ok.
 +                    do { end = readHeader(segmentId, end, reader); }
 +                    while (end < offset && end > prevEnd);
                  }
  
 -                checksum.update(buffer, 0, serializedSize);
 -                if (claimedCRC32 != checksum.getValue())
 -                {
 -                    // this entry must not have been fsynced. probably the rest is bad too,
 -                    // but just in case there is no harm in trying them (since we still read on an entry boundary)
 -                    continue;
 -                }
 +                if (end < prevEnd)
 +                    break;
  
 -                /* deserialize the commit log entry */
 -                FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
 -                final RowMutation rm;
 -                try
 -                {
 -                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn),
 -                                                            desc.getMessagingVersion(),
 -                                                            ColumnSerializer.Flag.LOCAL);
 -                    // doublecheck that what we read is [still] valid for the current schema
 -                    for (ColumnFamily cf : rm.getColumnFamilies())
 -                        for (Column cell : cf)
 -                            cf.getComparator().validate(cell.name());
 -                }
 -                catch (UnknownColumnFamilyException ex)
 -                {
 -                    if (ex.cfId == null)
 -                        continue;
 -                    AtomicInteger i = invalidMutations.get(ex.cfId);
 -                    if (i == null)
 -                    {
 -                        i = new AtomicInteger(1);
 -                        invalidMutations.put(ex.cfId, i);
 -                    }
 -                    else
 -                        i.incrementAndGet();
 -                    continue;
 -                }
 -                catch (Throwable t)
 +                if (logger.isDebugEnabled())
 +                    logger.debug("Replaying {} between {} and {}", file, offset, end);
 +
 +                reader.seek(offset);
 +
 +                 /* read the logs populate Mutation and apply */
 +                while (reader.getPosition() < end && !reader.isEOF())
                  {
 -                    File f = File.createTempFile("mutation", "dat");
 -                    DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("Reading mutation at {}", reader.getFilePointer());
 +
 +                    long claimedCRC32;
 +                    int serializedSize;
                      try
                      {
 -                        out.write(buffer, 0, serializedSize);
 +                        // any of the reads may hit EOF
 +                        serializedSize = reader.readInt();
 +                        if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                        {
 +                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
 +                            break main;
 +                        }
 +
 +                        // Mutation must be at LEAST 10 bytes:
 +                        // 3 each for a non-empty Keyspace and Key (including the
 +                        // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
 +                        // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 +                        if (serializedSize < 10)
 +                            break main;
 +
 +                        long claimedSizeChecksum = reader.readLong();
 +                        checksum.reset();
-                         if (version < CommitLogDescriptor.VERSION_20)
++                        if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
 +                            checksum.update(serializedSize);
 +                        else
 +                            FBUtilities.updateChecksumInt(checksum, serializedSize);
 +
 +                        if (checksum.getValue() != claimedSizeChecksum)
 +                            break main; // entry wasn't synced correctly/fully. that's
 +                        // ok.
 +
 +                        if (serializedSize > buffer.length)
 +                            buffer = new byte[(int) (1.2 * serializedSize)];
 +                        reader.readFully(buffer, 0, serializedSize);
 +                        claimedCRC32 = reader.readLong();
                      }
 -                    finally
 +                    catch (EOFException eof)
                      {
 -                        out.close();
 +                        break main; // last CL entry didn't get completely written. that's ok.
                      }
 -                    String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
 -                                              f.getAbsolutePath());
 -                    logger.error(st, t);
 -                    continue;
 -                }
  
 -                if (logger.isDebugEnabled())
 -                    logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
 -                            + "}"));
 +                    checksum.update(buffer, 0, serializedSize);
 +                    if (claimedCRC32 != checksum.getValue())
 +                    {
 +                        // this entry must not have been fsynced. probably the rest is bad too,
 +                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
 +                        continue;
 +                    }
  
 -                final long entryLocation = reader.getFilePointer();
 -                Runnable runnable = new WrappedRunnable()
 -                {
 -                    public void runMayThrow() throws IOException
 +                    /* deserialize the commit log entry */
 +                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
 +                    final Mutation mutation;
 +                    try
                      {
-                         // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
-                         // the current version. so do make sure the CL is drained prior to upgrading a node.
-                         mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
 -                        if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
 -                            return;
 -                        if (pointInTimeExceeded(rm))
 -                            return;
 -
 -                        final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
 -
 -                        // Rebuild the row mutation, omitting column families that
 -                        //    a) the user has requested that we ignore,
 -                        //    b) have already been flushed,
 -                        // or c) are part of a cf that was dropped.
 -                        // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 -                        RowMutation newRm = null;
 -                        for (ColumnFamily columnFamily : replayFilter.filter(rm))
++                        mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
++                                                                   desc.getMessagingVersion(),
++                                                                   ColumnSerializer.Flag.LOCAL);
 +                        // doublecheck that what we read is [still] valid for the current schema
 +                        for (ColumnFamily cf : mutation.getColumnFamilies())
 +                            for (Cell cell : cf)
 +                                cf.getComparator().validate(cell.name());
 +                    }
 +                    catch (UnknownColumnFamilyException ex)
 +                    {
 +                        if (ex.cfId == null)
 +                            continue;
 +                        AtomicInteger i = invalidMutations.get(ex.cfId);
 +                        if (i == null)
                          {
 -                            if (Schema.instance.getCF(columnFamily.id()) == null)
 -                                continue; // dropped
 +                            i = new AtomicInteger(1);
 +                            invalidMutations.put(ex.cfId, i);
 +                        }
 +                        else
 +                            i.incrementAndGet();
 +                        continue;
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        File f = File.createTempFile("mutation", "dat");
 +                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
 +                        try
 +                        {
 +                            out.write(buffer, 0, serializedSize);
 +                        }
 +                        finally
 +                        {
 +                            out.close();
 +                        }
 +                        String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
 +                                                  f.getAbsolutePath());
 +                        logger.error(st, t);
 +                        continue;
 +                    }
  
 -                            ReplayPosition rp = cfPositions.get(columnFamily.id());
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
  
 -                            // replay if current segment is newer than last flushed one or,
 -                            // if it is the last known segment, if we are after the replay position
 -                            if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
 +                    final long entryLocation = reader.getFilePointer();
 +                    Runnable runnable = new WrappedRunnable()
 +                    {
 +                        public void runMayThrow() throws IOException
 +                        {
 +                            if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                                return;
 +                            if (pointInTimeExceeded(mutation))
 +                                return;
 +
 +                            final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +
 +                            // Rebuild the mutation, omitting column families that
 +                            //    a) the user has requested that we ignore,
 +                            //    b) have already been flushed,
 +                            // or c) are part of a cf that was dropped.
 +                            // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 +                            Mutation newMutation = null;
 +                            for (ColumnFamily columnFamily : replayFilter.filter(mutation))
                              {
 -                                if (newRm == null)
 -                                    newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
 -                                newRm.add(columnFamily);
 -                                replayedCount.incrementAndGet();
 +                                if (Schema.instance.getCF(columnFamily.id()) == null)
 +                                    continue; // dropped
 +
 +                                ReplayPosition rp = cfPositions.get(columnFamily.id());
 +
 +                                // replay if current segment is newer than last flushed one or,
 +                                // if it is the last known segment, if we are after the replay position
 +                                if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
 +                                {
 +                                    if (newMutation == null)
 +                                        newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 +                                    newMutation.add(columnFamily);
 +                                    replayedCount.incrementAndGet();
 +                                }
 +                            }
 +                            if (newMutation != null)
 +                            {
 +                                assert !newMutation.isEmpty();
 +                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
 +                                keyspacesRecovered.add(keyspace);
                              }
                          }
 -                        if (newRm != null)
 -                        {
 -                            assert !newRm.isEmpty();
 -                            Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
 -                            keyspacesRecovered.add(keyspace);
 -                        }
 +                    };
 +                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
 +                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
 +                    {
 +                        FBUtilities.waitOnFutures(futures);
 +                        futures.clear();
                      }
 -                };
 -                futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
 -                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
 -                {
 -                    FBUtilities.waitOnFutures(futures);
 -                    futures.clear();
                  }
 +
-                 if (version < CommitLogDescriptor.VERSION_21)
++                if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
 +                    break;
 +
 +                offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
 +                prevEnd = end;
              }
          }
          finally