You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/11/17 00:08:26 UTC

[5/8] git commit: Fix endless loop/compaction of schema_* CFs due to broken timestamps patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4880

Fix endless loop/compaction of schema_* CFs due to broken timestamps
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4880


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd1633ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd1633ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd1633ba

Branch: refs/heads/trunk
Commit: dd1633ba0bf0e15547f5b7048271ab7334e862f0
Parents: caeee7c
Author: Pavel Yaskevich <py...@twitter.com>
Authored: Fri Nov 16 14:23:24 2012 -0800
Committer: Pavel Yaskevich <py...@twitter.com>
Committed: Fri Nov 16 14:31:18 2012 -0800

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/DefsTable.java    |   29 +++++++++++----
 src/java/org/apache/cassandra/db/RowMutation.java  |    3 ++
 .../org/apache/cassandra/net/MessagingService.java |   12 +++---
 .../apache/cassandra/service/MigrationManager.java |    9 +++--
 5 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b80c60f..2ed9666 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
  * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
  * Allow static CF definition with compact storage (CASSANDRA-4910)
+ * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880)
 
 
 1.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 2e4e5d3..4d6b574 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -174,7 +174,7 @@ public class DefsTable
         ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(columnFamily);
 
         boolean needsCleanup = false;
-        long timestamp = FBUtilities.timestampMicros();
+        Date now = new Date();
 
         List<Row> rows = SystemTable.serializedSchema(columnFamily);
 
@@ -186,11 +186,24 @@ public class DefsTable
 
             for (IColumn column : row.cf.columns)
             {
-                if (column.timestamp() > timestamp)
+                Date columnDate = new Date(column.timestamp());
+
+                if (columnDate.after(now))
+                {
+                    Date micros = new Date(column.timestamp() / 1000); // assume that it was in micros
+
+                    Calendar calendar = Calendar.getInstance();
+                    calendar.setTime(micros);
+
+                    if ((micros.before(now) && calendar.get(Calendar.YEAR) == 1970) || micros.after(now))
+                    {
+                        needsCleanup = true;
+                        break row_check_loop;
+                    }
+                }
+                else // millis and we have to fix it to micros
                 {
                     needsCleanup = true;
-                    // exit the loop on first found timestamp mismatch as we know that it
-                    // wouldn't be only one column/row that we would have to fix anyway
                     break row_check_loop;
                 }
             }
@@ -214,6 +227,8 @@ public class DefsTable
             throw new AssertionError(e);
         }
 
+        long microTimestamp = now.getTime() * 1000;
+
         for (Row row : rows)
         {
             if (invalidSchemaRow(row))
@@ -224,7 +239,7 @@ public class DefsTable
             for (IColumn column : row.cf.columns)
             {
                 if (column.isLive())
-                    mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), timestamp);
+                    mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), microTimestamp);
             }
 
             mutation.apply();
@@ -315,9 +330,9 @@ public class DefsTable
      */
     public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException
     {
-        if (version < MessagingService.VERSION_11)
+        if (version < MessagingService.VERSION_117)
         {
-            logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first.");
+            logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1.6, please update first.");
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 3a05df9..16fa4be 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -483,6 +483,9 @@ public class RowMutation implements IMutation, MessageProducer
                         cf.addColumn(new Column(column.name(), column.value(), now));
                 }
 
+                if (cf.isMarkedForDelete() && cf.isEmpty())
+                    continue;
+
                 fixedModifications.put(modification.getKey(), cf);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b2649f9..7974e6c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -28,7 +28,6 @@ import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -37,7 +36,6 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,11 +65,13 @@ public final class MessagingService implements MessagingServiceMBean
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
-    public static final int VERSION_07 = 1;
+    public static final int VERSION_07  = 1;
     public static final int VERSION_080 = 2;
-    public static final int VERSION_10 = 3;
-    public static final int VERSION_11 = 4;
-    public static final int version_ = VERSION_11;
+    public static final int VERSION_10  = 3;
+    public static final int VERSION_11  = 4;
+    public static final int VERSION_117 = 5;
+
+    public static final int version_ = VERSION_117;
 
     static SerializerType serializerType_ = SerializerType.BINARY;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 446bb5c..973b190 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -94,8 +94,8 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     private static void rectifySchema(UUID theirVersion, final InetAddress endpoint)
     {
-        // Can't request migrations from nodes with versions younger than 1.1
-        if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11)
+        // Can't request migrations from nodes with versions younger than 1.1.7
+        if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117)
             return;
 
         if (Schema.instance.getVersion().equals(theirVersion))
@@ -341,11 +341,12 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
             liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 
             // force migration is there are nodes around, first of all
-            // check if there are nodes with versions >= 1.1 to request migrations from,
+            // check if there are nodes with versions >= 1.1.7 to request migrations from,
             // because migration format of the nodes with versions < 1.1 is incompatible with older versions
+            // and due to broken timestamps in versions prior to 1.1.7
             for (InetAddress node : liveEndpoints)
             {
-                if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_11)
+                if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_117)
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Requesting schema from " + node);