You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/08/24 19:49:56 UTC
[1/2] cassandra git commit: Only use batchlog when paired
materialized view replica is remote
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 a3d9e6c47 -> e76932422
Only use batchlog when paired materialized view replica is remote
Patch by tjake; reviewed by carl for CASSANDRA-10061
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7693242
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7693242
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7693242
Branch: refs/heads/cassandra-3.0
Commit: e769324220ccfb2e48063d639e378a8a34814651
Parents: 1fc3121
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Aug 12 15:55:29 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/StorageProxy.java | 29 ++++++++++++++++----
2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9cfbd64..2695233 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta2
+ * Only use batchlog when paired materialized view replica is remote (CASSANDRA-10061)
* Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
* Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
* Fix sstablerepairedset (CASSANDRA-10132)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f58ac56..25789bb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
@@ -675,7 +678,8 @@ public class StorageProxy implements StorageProxyMBean
{
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
- List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk));
+ InetAddress pairedEndpoint = MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+ List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
consistencyLevel,
@@ -684,14 +688,27 @@ public class StorageProxy implements StorageProxyMBean
WriteType.BATCH,
cleanup);
- wrappers.add(wrapper);
+ //When local node is the endpoint and there are no pending nodes we can
+ // Just apply the mutation locally.
+ if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
+ wrapper.handler.pendingEndpoints.isEmpty())
+ {
+ mutation.apply();
+ }
+ else
+ {
+ wrappers.add(wrapper);
+ }
}
- //Apply to local batchlog memtable in this thread
- BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+ if (!wrappers.isEmpty())
+ {
+ //Apply to local batchlog memtable in this thread
+ BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version).apply();
- // now actually perform the writes and wait for them to complete
- asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+ // now actually perform the writes and wait for them to complete
+ asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+ }
}
catch (WriteTimeoutException ex)
{
[2/2] cassandra git commit: Reuse TemporalRow when updating multiple
MaterializedViews
Posted by ja...@apache.org.
Reuse TemporalRow when updating multiple MaterializedViews
Patch by tjake; rewiewed by carl for CASSANDRA-10060
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fc31216
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fc31216
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fc31216
Branch: refs/heads/cassandra-3.0
Commit: 1fc31216cbf2fc4b2fa687cca20c59eb3c4246e3
Parents: a3d9e6c
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Aug 12 15:56:25 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 2 +-
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
.../cassandra/db/view/MaterializedView.java | 86 ++++++++++++++------
.../db/view/MaterializedViewBuilder.java | 5 +-
.../db/view/MaterializedViewManager.java | 10 ++-
.../apache/cassandra/db/view/TemporalRow.java | 58 ++++++++++++-
.../apache/cassandra/service/StorageProxy.java | 6 +-
7 files changed, 133 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c866905..9cfbd64 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,8 @@
3.0.0-beta2
+ * Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
* Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
* Fix sstablerepairedset (CASSANDRA-10132)
-
3.0.0-beta1
* Redesign secondary index API (CASSANDRA-9459, 7771, 9041)
* Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 199cd25..f5a047f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -457,7 +457,7 @@ public class Keyspace
try
{
Tracing.trace("Create materialized view mutations from replica");
- cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
+ cfs.materializedViewManager.pushViewReplicaUpdates(upd);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 7337e4b..0f6cf06 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -481,26 +481,36 @@ public class MaterializedView
if (command != null)
{
- QueryPager pager = command.getPager(null);
- // Add all of the rows which were recovered from the query to the row set
- while (!pager.isExhausted())
+ //We may have already done this work for
+ //another MV update so check
+
+ if (!rowSet.hasTombstonedExisting())
{
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- if (!iter.hasNext())
- break;
+ QueryPager pager = command.getPager(null);
- try (RowIterator rowIterator = iter.next())
+ // Add all of the rows which were recovered from the query to the row set
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
{
- while (rowIterator.hasNext())
+ if (!iter.hasNext())
+ break;
+
+ try (RowIterator rowIterator = iter.next())
{
- Row row = rowIterator.next();
- rowSet.addRow(row, false);
+ while (rowIterator.hasNext())
+ {
+ Row row = rowIterator.next();
+ rowSet.addRow(row, false);
+ }
}
}
}
+
+ //Incase we fetched nothing, avoid re checking on another MV update
+ rowSet.setTombstonedExisting();
}
// If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
@@ -558,13 +568,11 @@ public class MaterializedView
/**
* @return Set of rows which are contained in the partition update {@param partition}
*/
- private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition)
+ private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
{
- Set<ColumnIdentifier> columns = new HashSet<>();
- for (ColumnDefinition def : this.columns.primaryKeyDefs)
- columns.add(def.name);
- TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
+ TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
+
for (Row row : partition)
rowSet.addRow(row, true);
@@ -572,23 +580,53 @@ public class MaterializedView
}
/**
+ * Splits the partition update up and adds the existing state to each row.
+ * This data can be reused for multiple MV updates on the same base table
+ *
+ * @param partition the mutation
+ * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+ * since all of the update will already be present in the base table.
+ * @return The set of temoral rows contained in this update
+ */
+ public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
+ {
+ if (!updateAffectsView(partition))
+ return null;
+
+ Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
+ for (ColumnDefinition def : this.columns.primaryKeyDefs)
+ columns.add(def.name);
+
+ TemporalRow.Set rowSet = null;
+ if (existing == null)
+ {
+ rowSet = separateRows(partition, columns);
+
+ // If we are building the view, we do not want to add old values; they will always be the same
+ if (!isBuilding)
+ readLocalRows(rowSet);
+ }
+ else
+ {
+ rowSet = existing.withNewViewPrimaryKey(columns);
+ }
+
+ return rowSet;
+ }
+
+
+ /**
* @param isBuilding If the view is currently being built, we do not query the values which are already stored,
* since all of the update will already be present in the base table.
* @return View mutations which represent the changes necessary as long as previously created mutations for the view
* have been applied successfully. This is based solely on the changes that are necessary given the current
* state of the base table and the newly applying partition data.
*/
- public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition partition, boolean isBuilding)
+ public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
{
if (!updateAffectsView(partition))
return null;
- TemporalRow.Set rowSet = separateRows(key, partition);
-
- // If we are building the view, we do not want to add old values; they will always be the same
- if (!isBuilding)
- readLocalRows(rowSet);
-
Collection<Mutation> mutations = null;
for (TemporalRow temporalRow : rowSet)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
index e8842ed..5fa5a82 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -89,7 +89,10 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder
try (RowIterator rowIterator = partitionIterator.next())
{
- Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
+ FilteredPartition partition = FilteredPartition.create(rowIterator);
+ TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
+
+ Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
if (mutations != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
index 7f97728..5184d8d 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -150,16 +150,20 @@ public class MaterializedViewManager
* Calculates and pushes updates to the views replicas. The replicas are determined by
* {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
*/
- public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
+ public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
{
// This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
// view node.
if (!StorageService.instance.isJoined()) return;
List<Mutation> mutations = null;
+ TemporalRow.Set temporalRows = null;
for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
{
- Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
+
+ temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
+
+ Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
if (viewMutations != null && !viewMutations.isEmpty())
{
if (mutations == null)
@@ -169,7 +173,7 @@ public class MaterializedViewManager
}
if (mutations != null)
{
- StorageProxy.mutateMV(key, mutations);
+ StorageProxy.mutateMV(update.partitionKey().getKey(), mutations);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 00fdf48..3ba91ee 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -162,7 +162,10 @@ public class TemporalRow
private final ColumnFamilyStore baseCfs;
private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
private final ByteBuffer basePartitionKey;
- public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+ private final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+ private final Row startRow;
+ private final boolean startIsNew;
+
public final int nowInSec;
private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>();
private int viewClusteringTtl = NO_TTL;
@@ -174,14 +177,18 @@ public class TemporalRow
this.baseCfs = baseCfs;
this.viewPrimaryKey = viewPrimaryKey;
this.basePartitionKey = key;
+ this.startRow = row;
+ this.startIsNew = isNew;
this.nowInSec = nowInSec;
- clusteringColumns = new HashMap<>();
+
LivenessInfo liveness = row.primaryKeyLivenessInfo();
this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME);
this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP);
this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns();
+ clusteringColumns = new HashMap<>();
+
for (int i = 0; i < clusteringDefs.size(); i++)
{
ColumnDefinition cdef = clusteringDefs.get(i);
@@ -371,6 +378,7 @@ public class TemporalRow
public final DecoratedKey dk;
private final Map<Clustering, TemporalRow> clusteringToRow;
final int nowInSec = FBUtilities.nowInSeconds();
+ private boolean hasTombstonedExisting = false;
Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key)
{
@@ -400,12 +408,56 @@ public class TemporalRow
clusteringToRow.put(row.clustering(), temporalRow);
}
- for (Cell cell: row.cells())
+ for (Cell cell : row.cells())
{
temporalRow.addCell(cell, isNew);
}
}
+ private void addRow(TemporalRow row)
+ {
+ TemporalRow newRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row.startRow, nowInSec, row.startIsNew);
+
+ TemporalRow existing = clusteringToRow.put(row.startRow.clustering(), newRow);
+ assert existing == null;
+
+
+ for (Map.Entry<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> entry : row.columnValues.entrySet())
+ {
+ for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> cellPathEntry : entry.getValue().entrySet())
+ {
+ SortedMap<Long, TemporalCell> oldCells = cellPathEntry.getValue();
+
+ for (Map.Entry<Long, TemporalCell> cellEntry : oldCells.entrySet())
+ {
+ newRow.addColumnValue(entry.getKey(), cellPathEntry.getKey(), cellEntry.getKey(),
+ cellEntry.getValue().ttl, cellEntry.getValue().localDeletionTime,
+ cellEntry.getValue().value, cellEntry.getValue().isNew);
+ }
+ }
+ }
+ }
+
+ public TemporalRow.Set withNewViewPrimaryKey(java.util.Set<ColumnIdentifier> viewPrimaryKey)
+ {
+ TemporalRow.Set newSet = new Set(baseCfs, viewPrimaryKey, key);
+
+ for (TemporalRow row : this)
+ newSet.addRow(row);
+
+ return newSet;
+ }
+
+ public boolean hasTombstonedExisting()
+ {
+ return hasTombstonedExisting;
+ }
+
+ public void setTombstonedExisting()
+ {
+ hasTombstonedExisting = true;
+ }
+
public int size()
{
return clusteringToRow.size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d999ee..f58ac56 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -685,11 +685,11 @@ public class StorageProxy implements StorageProxyMBean
cleanup);
wrappers.add(wrapper);
-
- //Apply to local batchlog memtable in this thread
- BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
}
+ //Apply to local batchlog memtable in this thread
+ BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+
// now actually perform the writes and wait for them to complete
asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
}