You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/09/01 21:31:41 UTC
[1/2] cassandra git commit: Re-apply MV updates on commitlog replay
Repository: cassandra
Updated Branches:
refs/heads/trunk 78dcf79c7 -> 17f8788ef
Re-apply MV updates on commitlog replay
patch by tjake; reviewed by carlyeks for CASSANDRA-10164
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5868685
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5868685
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5868685
Branch: refs/heads/trunk
Commit: b58686858c632ed642ccf355f1f3a588e28b0e8a
Parents: 9c02625
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu Aug 27 13:28:04 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Sep 1 15:30:20 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 29 +++++++++++------
.../db/commitlog/CommitLogReplayer.java | 13 +++++++-
.../db/view/MaterializedViewBuilder.java | 13 +-------
.../db/view/MaterializedViewManager.java | 8 ++---
.../apache/cassandra/service/StorageProxy.java | 33 +++++++-------------
.../cassandra/streaming/StreamReceiveTask.java | 8 +++--
.../cassandra/cql3/MaterializedViewTest.java | 26 +++++++++++++++
8 files changed, 79 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6539792..88b99a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta2
+ * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
* Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
* Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
* Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f5a047f..981209c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -386,7 +386,17 @@ public class Keyspace
public void apply(Mutation mutation, boolean writeCommitLog)
{
- apply(mutation, writeCommitLog, true);
+ apply(mutation, writeCommitLog, true, false);
+ }
+
+ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ {
+ apply(mutation, writeCommitLog, updateIndexes, false);
+ }
+
+ public void applyFromCommitLog(Mutation mutation)
+ {
+ apply(mutation, false, true, true);
}
/**
@@ -396,8 +406,9 @@ public class Keyspace
* may happen concurrently, depending on the CL Executor type.
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
+ * @param isClReplay true if caller is the commitlog replayer
*/
- public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
+ public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes, boolean isClReplay)
{
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
@@ -456,15 +467,15 @@ public class Keyspace
{
try
{
- Tracing.trace("Create materialized view mutations from replica");
- cfs.materializedViewManager.pushViewReplicaUpdates(upd);
+ Tracing.trace("Creating materialized view mutations from base table replica");
+ cfs.materializedViewManager.pushViewReplicaUpdates(upd, !isClReplay);
}
- catch (Exception e)
+ catch (Throwable t)
{
- if (!(e instanceof WriteTimeoutException))
- logger.warn("Encountered exception when creating materialized view mutations", e);
-
- JVMStabilityInspector.inspectThrowable(e);
+ JVMStabilityInspector.inspectThrowable(t);
+ logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s.%s",
+ upd.metadata().ksName, upd.metadata().cfName), t);
+ throw t;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 93c3026..4f50008 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -161,8 +161,19 @@ public class CommitLogReplayer
// flush replayed keyspaces
futures.clear();
+ boolean flushingSystem = false;
for (Keyspace keyspace : keyspacesRecovered)
+ {
+ if (keyspace.getName().equals(SystemKeyspace.NAME))
+ flushingSystem = true;
+
futures.addAll(keyspace.flush());
+ }
+
+ // also flush batchlog incase of any MV updates
+ if (!flushingSystem)
+ futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush());
+
FBUtilities.waitOnFutures(futures);
return replayedCount.get();
}
@@ -594,7 +605,7 @@ public class CommitLogReplayer
if (newMutation != null)
{
assert !newMutation.isEmpty();
- Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+ Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
keyspacesRecovered.add(keyspace);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
index 6083634..e23fd84 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -95,18 +95,7 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder
Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
if (mutations != null)
- {
- try
- {
- StorageProxy.mutateMV(key.getKey(), mutations);
- break;
- }
- catch (WriteTimeoutException ex)
- {
- NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
- .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
- }
- }
+ StorageProxy.mutateMV(key.getKey(), mutations, true);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
index e0cecf5..ac6a256 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -151,12 +151,8 @@ public class MaterializedViewManager
* Calculates and pushes updates to the views replicas. The replicas are determined by
* {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
*/
- public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
+ public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
{
- // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
- // view node.
- if (!StorageService.instance.isJoined()) return;
-
List<Mutation> mutations = null;
TemporalRow.Set temporalRows = null;
for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
@@ -174,7 +170,7 @@ public class MaterializedViewManager
}
if (mutations != null)
{
- StorageProxy.mutateMV(update.partitionKey().getKey(), mutations);
+ StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 25789bb..4952959 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -652,8 +652,7 @@ public class StorageProxy implements StorageProxyMBean
*
* @param mutations the mutations to be applied across the replicas
*/
- public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations)
- throws UnavailableException, OverloadedException, WriteTimeoutException
+ public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog)
{
Tracing.trace("Determining replicas for mutation");
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -693,7 +692,10 @@ public class StorageProxy implements StorageProxyMBean
if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
wrapper.handler.pendingEndpoints.isEmpty())
{
- mutation.apply();
+ if (writeCommitLog)
+ mutation.apply();
+ else
+ mutation.applyUnsafe();
}
else
{
@@ -703,31 +705,18 @@ public class StorageProxy implements StorageProxyMBean
if (!wrappers.isEmpty())
{
+ Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version);
+
//Apply to local batchlog memtable in this thread
- BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version).apply();
+ if (writeCommitLog)
+ blMutation.apply();
+ else
+ blMutation.applyUnsafe();
// now actually perform the writes and wait for them to complete
asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
}
}
- catch (WriteTimeoutException ex)
- {
- mvWriteMetrics.timeouts.mark();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
- }
- catch (UnavailableException e)
- {
- mvWriteMetrics.unavailables.mark();
- Tracing.trace("Unavailable");
- throw e;
- }
- catch (OverloadedException e)
- {
- mvWriteMetrics.unavailables.mark();
- Tracing.trace("Overloaded");
- throw e;
- }
finally
{
mvWriteMetrics.addNano(System.nanoTime() - startTime);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 52c8884..cb99654 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -146,7 +146,7 @@ public class StreamReceiveTask extends StreamTask
//We have a special path for Materialized view.
//Since the MV requires cleaning up any pre-existing state, we must put
//all partitions through the same write path as normal mutations.
- //This also ensures any 2is are also updated
+ //This also ensures any 2i's are also updated
if (hasMaterializedViews)
{
for (SSTableReader reader : readers)
@@ -157,7 +157,8 @@ public class StreamReceiveTask extends StreamTask
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- new Mutation(PartitionUpdate.fromIterator(rowIterator)).apply();
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
}
}
}
@@ -183,7 +184,10 @@ public class StreamReceiveTask extends StreamTask
//We don't keep the streamed sstables since we've applied them manually
//So we abort the txn and delete the streamed sstables
if (hasMaterializedViews)
+ {
+ cfs.forceBlockingFlush();
task.txn.abort();
+ }
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
index daa68e9..7d08a8b 100644
--- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
@@ -292,6 +292,32 @@ public class MaterializedViewTest extends CQLTester
}
@Test
+ public void testBuilderWidePartition() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "k int, " +
+ "c int, " +
+ "intval int, " +
+ "PRIMARY KEY (k, c))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+
+ for(int i = 0; i < 1024; i++)
+ execute("INSERT INTO %s (k, c, intval) VALUES (?, ?, ?)", 0, i, 0);
+
+ createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL AND intval IS NOT NULL PRIMARY KEY (intval, c, k)");
+
+
+ while (!SystemKeyspace.isViewBuilt(keyspace(), "mv"))
+ Thread.sleep(1000);
+
+ assertRows(execute("SELECT count(*) from %s WHERE k = ?", 0), row(1024L));
+ assertRows(execute("SELECT count(*) from mv WHERE intval = ?", 0), row(1024L));
+ }
+
+ @Test
public void testRangeTombstone() throws Throwable
{
createTable("CREATE TABLE %s (" +
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17f8788e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17f8788e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17f8788e
Branch: refs/heads/trunk
Commit: 17f8788ef80ff541f43141e0181efe9baea46db9
Parents: 78dcf79 b586868
Author: T Jake Luciani <ja...@apache.org>
Authored: Tue Sep 1 15:31:27 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Sep 1 15:31:27 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 29 +++++++++++------
.../db/commitlog/CommitLogReplayer.java | 13 +++++++-
.../db/view/MaterializedViewBuilder.java | 13 +-------
.../db/view/MaterializedViewManager.java | 8 ++---
.../apache/cassandra/service/StorageProxy.java | 33 +++++++-------------
.../cassandra/streaming/StreamReceiveTask.java | 8 +++--
.../cassandra/cql3/MaterializedViewTest.java | 26 +++++++++++++++
8 files changed, 79 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/17f8788e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0c94103,88b99a2..9a4b48f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
+3.2
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0.0-beta2
+ * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
* Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
* Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
* Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156)