You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/08/24 19:49:56 UTC

[1/2] cassandra git commit: Only use batchlog when paired materialized view replica is remote

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 a3d9e6c47 -> e76932422


Only use batchlog when paired materialized view replica is remote

Patch by tjake; reviewed by carl for CASSANDRA-10061


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7693242
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7693242
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7693242

Branch: refs/heads/cassandra-3.0
Commit: e769324220ccfb2e48063d639e378a8a34814651
Parents: 1fc3121
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Aug 12 15:55:29 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  | 29 ++++++++++++++++----
 2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9cfbd64..2695233 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Only use batchlog when paired materialized view replica is remote (CASSANDRA-10061)
  * Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
  * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
  * Fix sstablerepairedset (CASSANDRA-10132)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f58ac56..25789bb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
@@ -675,7 +678,8 @@ public class StorageProxy implements StorageProxyMBean
             {
                 String keyspaceName = mutation.getKeyspaceName();
                 Token tk = mutation.key().getToken();
-                List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk));
+                InetAddress pairedEndpoint = MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
 
                 WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
                                                                                  consistencyLevel,
@@ -684,14 +688,27 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  WriteType.BATCH,
                                                                                  cleanup);
 
-                wrappers.add(wrapper);
+                //When local node is the endpoint and there are no pending nodes we can
+                // Just apply the mutation locally.
+                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
+                    wrapper.handler.pendingEndpoints.isEmpty())
+                {
+                    mutation.apply();
+                }
+                else
+                {
+                    wrappers.add(wrapper);
+                }
             }
 
-            //Apply to local batchlog memtable in this thread
-            BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+            if (!wrappers.isEmpty())
+            {
+                //Apply to local batchlog memtable in this thread
+                BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version).apply();
 
-            // now actually perform the writes and wait for them to complete
-            asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+                // now actually perform the writes and wait for them to complete
+                asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+            }
         }
         catch (WriteTimeoutException ex)
         {


[2/2] cassandra git commit: Reuse TemporalRow when updating multiple MaterializedViews

Posted by ja...@apache.org.
Reuse TemporalRow when updating multiple MaterializedViews

Patch by tjake; rewiewed by carl for CASSANDRA-10060


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fc31216
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fc31216
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fc31216

Branch: refs/heads/cassandra-3.0
Commit: 1fc31216cbf2fc4b2fa687cca20c59eb3c4246e3
Parents: a3d9e6c
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Aug 12 15:56:25 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  2 +-
 .../cassandra/db/view/MaterializedView.java     | 86 ++++++++++++++------
 .../db/view/MaterializedViewBuilder.java        |  5 +-
 .../db/view/MaterializedViewManager.java        | 10 ++-
 .../apache/cassandra/db/view/TemporalRow.java   | 58 ++++++++++++-
 .../apache/cassandra/service/StorageProxy.java  |  6 +-
 7 files changed, 133 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c866905..9cfbd64 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,8 @@
 3.0.0-beta2
+ * Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
  * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
  * Fix sstablerepairedset (CASSANDRA-10132)
 
-
 3.0.0-beta1
  * Redesign secondary index API (CASSANDRA-9459, 7771, 9041)
  * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 199cd25..f5a047f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -457,7 +457,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Create materialized view mutations from replica");
-                        cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
+                        cfs.materializedViewManager.pushViewReplicaUpdates(upd);
                     }
                     catch (Exception e)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 7337e4b..0f6cf06 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -481,26 +481,36 @@ public class MaterializedView
 
         if (command != null)
         {
-            QueryPager pager = command.getPager(null);
 
-            // Add all of the rows which were recovered from the query to the row set
-            while (!pager.isExhausted())
+            //We may have already done this work for
+            //another MV update so check
+
+            if (!rowSet.hasTombstonedExisting())
             {
-                try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                     PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-                {
-                    if (!iter.hasNext())
-                        break;
+                QueryPager pager = command.getPager(null);
 
-                    try (RowIterator rowIterator = iter.next())
+                // Add all of the rows which were recovered from the query to the row set
+                while (!pager.isExhausted())
+                {
+                    try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                         PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
                     {
-                        while (rowIterator.hasNext())
+                        if (!iter.hasNext())
+                            break;
+
+                        try (RowIterator rowIterator = iter.next())
                         {
-                            Row row = rowIterator.next();
-                            rowSet.addRow(row, false);
+                            while (rowIterator.hasNext())
+                            {
+                                Row row = rowIterator.next();
+                                rowSet.addRow(row, false);
+                            }
                         }
                     }
                 }
+
+                //Incase we fetched nothing, avoid re checking on another MV update
+                rowSet.setTombstonedExisting();
             }
 
             // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
@@ -558,13 +568,11 @@ public class MaterializedView
     /**
      * @return Set of rows which are contained in the partition update {@param partition}
      */
-    private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition)
+    private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
     {
-        Set<ColumnIdentifier> columns = new HashSet<>();
-        for (ColumnDefinition def : this.columns.primaryKeyDefs)
-            columns.add(def.name);
 
-        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
+        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
+
         for (Row row : partition)
             rowSet.addRow(row, true);
 
@@ -572,23 +580,53 @@ public class MaterializedView
     }
 
     /**
+     * Splits the partition update up and adds the existing state to each row.
+     * This data can be reused for multiple MV updates on the same base table
+     *
+     * @param partition the mutation
+     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+     *                   since all of the update will already be present in the base table.
+     * @return The set of temoral rows contained in this update
+     */
+    public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
+    {
+        if (!updateAffectsView(partition))
+            return null;
+
+        Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
+        for (ColumnDefinition def : this.columns.primaryKeyDefs)
+            columns.add(def.name);
+
+        TemporalRow.Set rowSet = null;
+        if (existing == null)
+        {
+            rowSet = separateRows(partition, columns);
+
+            // If we are building the view, we do not want to add old values; they will always be the same
+            if (!isBuilding)
+                readLocalRows(rowSet);
+        }
+        else
+        {
+            rowSet = existing.withNewViewPrimaryKey(columns);
+        }
+
+        return rowSet;
+    }
+
+
+    /**
      * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
      *                   since all of the update will already be present in the base table.
      * @return View mutations which represent the changes necessary as long as previously created mutations for the view
      *         have been applied successfully. This is based solely on the changes that are necessary given the current
      *         state of the base table and the newly applying partition data.
      */
-    public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition partition, boolean isBuilding)
+    public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
     {
         if (!updateAffectsView(partition))
             return null;
 
-        TemporalRow.Set rowSet = separateRows(key, partition);
-
-        // If we are building the view, we do not want to add old values; they will always be the same
-        if (!isBuilding)
-            readLocalRows(rowSet);
-
         Collection<Mutation> mutations = null;
         for (TemporalRow temporalRow : rowSet)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
index e8842ed..5fa5a82 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -89,7 +89,10 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder
 
                try (RowIterator rowIterator = partitionIterator.next())
                {
-                   Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
+                   FilteredPartition partition = FilteredPartition.create(rowIterator);
+                   TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
+
+                   Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
 
                    if (mutations != null)
                    {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
index 7f97728..5184d8d 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -150,16 +150,20 @@ public class MaterializedViewManager
      * Calculates and pushes updates to the views replicas. The replicas are determined by
      * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
      */
-    public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
+    public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
         // view node.
         if (!StorageService.instance.isJoined()) return;
 
         List<Mutation> mutations = null;
+        TemporalRow.Set temporalRows = null;
         for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
         {
-            Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
+
+            temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
+
+            Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
             if (viewMutations != null && !viewMutations.isEmpty())
             {
                 if (mutations == null)
@@ -169,7 +173,7 @@ public class MaterializedViewManager
         }
         if (mutations != null)
         {
-            StorageProxy.mutateMV(key, mutations);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 00fdf48..3ba91ee 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -162,7 +162,10 @@ public class TemporalRow
     private final ColumnFamilyStore baseCfs;
     private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
     private final ByteBuffer basePartitionKey;
-    public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+    private final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+    private final Row startRow;
+    private final boolean startIsNew;
+
     public final int nowInSec;
     private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>();
     private int viewClusteringTtl = NO_TTL;
@@ -174,14 +177,18 @@ public class TemporalRow
         this.baseCfs = baseCfs;
         this.viewPrimaryKey = viewPrimaryKey;
         this.basePartitionKey = key;
+        this.startRow = row;
+        this.startIsNew = isNew;
         this.nowInSec = nowInSec;
-        clusteringColumns = new HashMap<>();
+
         LivenessInfo liveness = row.primaryKeyLivenessInfo();
         this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME);
         this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP);
         this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
 
         List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns();
+        clusteringColumns = new HashMap<>();
+
         for (int i = 0; i < clusteringDefs.size(); i++)
         {
             ColumnDefinition cdef = clusteringDefs.get(i);
@@ -371,6 +378,7 @@ public class TemporalRow
         public final DecoratedKey dk;
         private final Map<Clustering, TemporalRow> clusteringToRow;
         final int nowInSec = FBUtilities.nowInSeconds();
+        private boolean hasTombstonedExisting = false;
 
         Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key)
         {
@@ -400,12 +408,56 @@ public class TemporalRow
                 clusteringToRow.put(row.clustering(), temporalRow);
             }
 
-            for (Cell cell: row.cells())
+            for (Cell cell : row.cells())
             {
                 temporalRow.addCell(cell, isNew);
             }
         }
 
+        private void addRow(TemporalRow row)
+        {
+            TemporalRow newRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row.startRow, nowInSec, row.startIsNew);
+
+            TemporalRow existing = clusteringToRow.put(row.startRow.clustering(), newRow);
+            assert existing == null;
+
+
+            for (Map.Entry<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> entry : row.columnValues.entrySet())
+            {
+                for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> cellPathEntry : entry.getValue().entrySet())
+                {
+                    SortedMap<Long, TemporalCell> oldCells = cellPathEntry.getValue();
+
+                    for (Map.Entry<Long, TemporalCell> cellEntry : oldCells.entrySet())
+                    {
+                        newRow.addColumnValue(entry.getKey(), cellPathEntry.getKey(), cellEntry.getKey(),
+                                              cellEntry.getValue().ttl, cellEntry.getValue().localDeletionTime,
+                                              cellEntry.getValue().value, cellEntry.getValue().isNew);
+                    }
+                }
+            }
+        }
+
+        public TemporalRow.Set withNewViewPrimaryKey(java.util.Set<ColumnIdentifier> viewPrimaryKey)
+        {
+            TemporalRow.Set newSet = new Set(baseCfs, viewPrimaryKey, key);
+
+            for (TemporalRow row : this)
+                newSet.addRow(row);
+
+            return newSet;
+        }
+
+        public boolean hasTombstonedExisting()
+        {
+            return hasTombstonedExisting;
+        }
+
+        public void setTombstonedExisting()
+        {
+            hasTombstonedExisting = true;
+        }
+
         public int size()
         {
             return clusteringToRow.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d999ee..f58ac56 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -685,11 +685,11 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  cleanup);
 
                 wrappers.add(wrapper);
-
-                //Apply to local batchlog memtable in this thread
-                BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
             }
 
+            //Apply to local batchlog memtable in this thread
+            BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+
             // now actually perform the writes and wait for them to complete
             asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
         }