You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2013/09/15 01:06:57 UTC
git commit: Fix CommitLogReplayer date time issue patch by Vijay ;
reviewed by jbellis for CASSANDRA-5909
Updated Branches:
refs/heads/cassandra-1.2 b281dd15d -> 742e5baf6
Fix CommitLogReplayer date time issue
patch by Vijay ; reviewed by jbellis for CASSANDRA-5909
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/742e5baf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/742e5baf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/742e5baf
Branch: refs/heads/cassandra-1.2
Commit: 742e5baf6311a1141b42bec0b3c3dc2ff19fa376
Parents: b281dd1
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Sat Sep 14 16:05:54 2013 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Sat Sep 14 16:05:54 2013 -0700
----------------------------------------------------------------------
conf/commitlog_archiving.properties | 7 +++--
.../db/commitlog/CommitLogArchiver.java | 12 +++++++-
.../db/commitlog/CommitLogReplayer.java | 2 +-
test/conf/commitlog_archiving.properties | 19 +++++++++++++
.../cassandra/db/RecoveryManagerTest.java | 29 ++++++++++++++++++++
5 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/conf/commitlog_archiving.properties
----------------------------------------------------------------------
diff --git a/conf/commitlog_archiving.properties b/conf/commitlog_archiving.properties
index 23adaeb..b19cfed 100644
--- a/conf/commitlog_archiving.properties
+++ b/conf/commitlog_archiving.properties
@@ -43,8 +43,8 @@ restore_command=
# Directory to scan the recovery files in.
restore_directories=
-# Restore mutations created up to and including this timestamp.
-# Format: 2012-04-31 20:43:12
+# Restore mutations created up to and including this timestamp in GMT.
+# Format: yyyy:MM:dd HH:mm:ss (2012:04:31 20:43:12)
#
# Note! Recovery will stop when the first client-supplied timestamp
# greater than this time is encountered. Since the order Cassandra
@@ -52,3 +52,6 @@ restore_directories=
# this may leave some mutations with timestamps earlier than the
# point-in-time unrecovered.
restore_point_in_time=
+
+# precision of the timestamp used in the inserts (MILLISECONDS, MICROSECONDS, ...)
+precision=MILLISECONDS
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index e9d850a..78026a4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -27,6 +27,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.Properties;
+import java.util.TimeZone;
import java.util.concurrent.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
@@ -43,12 +44,19 @@ import com.google.common.base.Strings;
public class CommitLogArchiver
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class);
+ public static final SimpleDateFormat format = new SimpleDateFormat("yyyy:MM:dd HH:mm:ss");
+ static
+ {
+ format.setTimeZone(TimeZone.getTimeZone("GMT"));
+ }
+
public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>();
public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("commitlog_archiver");
private final String archiveCommand;
private final String restoreCommand;
private final String restoreDirectories;
public final long restorePointInTime;
+ public final TimeUnit precision;
public CommitLogArchiver()
{
@@ -65,6 +73,7 @@ public class CommitLogArchiver
restoreCommand = null;
restoreDirectories = null;
restorePointInTime = Long.MAX_VALUE;
+ precision = TimeUnit.MILLISECONDS;
}
else
{
@@ -73,9 +82,10 @@ public class CommitLogArchiver
restoreCommand = commitlog_commands.getProperty("restore_command");
restoreDirectories = commitlog_commands.getProperty("restore_directories");
String targetTime = commitlog_commands.getProperty("restore_point_in_time");
+ precision = TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MILLISECONDS"));
try
{
- restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : new SimpleDateFormat("yyyy:MM:dd HH:mm:ss").parse(targetTime).getTime();
+ restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : format.parse(targetTime).getTime();
}
catch (ParseException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 6b401fb..796ab5b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -281,7 +281,7 @@ public class CommitLogReplayer
for (ColumnFamily families : frm.getColumnFamilies())
{
- if (families.maxTimestamp() > restoreTarget)
+ if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget)
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/test/conf/commitlog_archiving.properties
----------------------------------------------------------------------
diff --git a/test/conf/commitlog_archiving.properties b/test/conf/commitlog_archiving.properties
new file mode 100644
index 0000000..aaf6bd1
--- /dev/null
+++ b/test/conf/commitlog_archiving.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+restore_point_in_time=2112:12:12 12:12:12
+precision=MICROSECONDS
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 68c0b37..9c17d80 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -19,13 +19,17 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.Date;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.cassandra.Util;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogArchiver;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.column;
@@ -101,4 +105,29 @@ public class RecoveryManagerTest extends SchemaLoader
assert c != null;
assert ((CounterColumn)c).total() == 10L;
}
+
+ @Test
+ public void testRecoverPIT() throws Exception
+ {
+ Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
+ long timeMS = date.getTime() - 5000;
+
+ Table keyspace1 = Table.open("Keyspace1");
+ DecoratedKey dk = Util.dk("dkey");
+ for (int i = 0; i < 10; ++i)
+ {
+ long ts = TimeUnit.MILLISECONDS.toMicros(timeMS + (i * 1000));
+ ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
+ cf.addColumn(column("name-" + i, "value", ts));
+ RowMutation rm = new RowMutation("Keyspace1", dk.key);
+ rm.add(cf);
+ rm.apply();
+ }
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
+ CommitLog.instance.recover();
+
+ ColumnFamily cf = Util.getColumnFamily(keyspace1, dk, "Standard1");
+ Assert.assertEquals(6, cf.getColumnCount());
+ }
}