You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/11/17 00:07:50 UTC
[3/6] git commit: Fix endless loop/compaction of schema_* CFs due to
broken timestamps patch by Pavel Yaskevich;
reviewed by Brandon Williams for CASSANDRA-4880
Fix endless loop/compaction of schema_* CFs due to broken timestamps
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4880
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd1633ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd1633ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd1633ba
Branch: refs/heads/cassandra-1.2
Commit: dd1633ba0bf0e15547f5b7048271ab7334e862f0
Parents: caeee7c
Author: Pavel Yaskevich <py...@twitter.com>
Authored: Fri Nov 16 14:23:24 2012 -0800
Committer: Pavel Yaskevich <py...@twitter.com>
Committed: Fri Nov 16 14:31:18 2012 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/DefsTable.java | 29 +++++++++++----
src/java/org/apache/cassandra/db/RowMutation.java | 3 ++
.../org/apache/cassandra/net/MessagingService.java | 12 +++---
.../apache/cassandra/service/MigrationManager.java | 9 +++--
5 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b80c60f..2ed9666 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
* Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
* Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
* Allow static CF definition with compact storage (CASSANDRA-4910)
+ * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880)
1.1.6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 2e4e5d3..4d6b574 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -174,7 +174,7 @@ public class DefsTable
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(columnFamily);
boolean needsCleanup = false;
- long timestamp = FBUtilities.timestampMicros();
+ Date now = new Date();
List<Row> rows = SystemTable.serializedSchema(columnFamily);
@@ -186,11 +186,24 @@ public class DefsTable
for (IColumn column : row.cf.columns)
{
- if (column.timestamp() > timestamp)
+ Date columnDate = new Date(column.timestamp());
+
+ if (columnDate.after(now))
+ {
+ Date micros = new Date(column.timestamp() / 1000); // assume that it was in micros
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(micros);
+
+ if ((micros.before(now) && calendar.get(Calendar.YEAR) == 1970) || micros.after(now))
+ {
+ needsCleanup = true;
+ break row_check_loop;
+ }
+ }
+ else // millis and we have to fix it to micros
{
needsCleanup = true;
- // exit the loop on first found timestamp mismatch as we know that it
- // wouldn't be only one column/row that we would have to fix anyway
break row_check_loop;
}
}
@@ -214,6 +227,8 @@ public class DefsTable
throw new AssertionError(e);
}
+ long microTimestamp = now.getTime() * 1000;
+
for (Row row : rows)
{
if (invalidSchemaRow(row))
@@ -224,7 +239,7 @@ public class DefsTable
for (IColumn column : row.cf.columns)
{
if (column.isLive())
- mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), timestamp);
+ mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), microTimestamp);
}
mutation.apply();
@@ -315,9 +330,9 @@ public class DefsTable
*/
public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException
{
- if (version < MessagingService.VERSION_11)
+ if (version < MessagingService.VERSION_117)
{
- logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first.");
+ logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1.6, please update first.");
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 3a05df9..16fa4be 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -483,6 +483,9 @@ public class RowMutation implements IMutation, MessageProducer
cf.addColumn(new Column(column.name(), column.value(), now));
}
+ if (cf.isMarkedForDelete() && cf.isEmpty())
+ continue;
+
fixedModifications.put(modification.getKey(), cf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b2649f9..7974e6c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -28,7 +28,6 @@ import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,7 +36,6 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,11 +65,13 @@ public final class MessagingService implements MessagingServiceMBean
public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
// 8 bits version, so don't waste versions
- public static final int VERSION_07 = 1;
+ public static final int VERSION_07 = 1;
public static final int VERSION_080 = 2;
- public static final int VERSION_10 = 3;
- public static final int VERSION_11 = 4;
- public static final int version_ = VERSION_11;
+ public static final int VERSION_10 = 3;
+ public static final int VERSION_11 = 4;
+ public static final int VERSION_117 = 5;
+
+ public static final int version_ = VERSION_117;
static SerializerType serializerType_ = SerializerType.BINARY;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 446bb5c..973b190 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -94,8 +94,8 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
private static void rectifySchema(UUID theirVersion, final InetAddress endpoint)
{
- // Can't request migrations from nodes with versions younger than 1.1
- if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11)
+ // Can't request migrations from nodes with versions younger than 1.1.7
+ if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117)
return;
if (Schema.instance.getVersion().equals(theirVersion))
@@ -341,11 +341,12 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
// force migration is there are nodes around, first of all
- // check if there are nodes with versions >= 1.1 to request migrations from,
+ // check if there are nodes with versions >= 1.1.7 to request migrations from,
// because migration format of the nodes with versions < 1.1 is incompatible with older versions
+ // and due to broken timestamps in versions prior to 1.1.7
for (InetAddress node : liveEndpoints)
{
- if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_11)
+ if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_117)
{
if (logger.isDebugEnabled())
logger.debug("Requesting schema from " + node);