You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/09/17 13:10:02 UTC

[cassandra] branch trunk updated: Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7c71f6  Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it
f7c71f6 is described below

commit f7c71f65c000c2c3ef7df1b034b8fdd822a396d8
Author: Aleksei Zotov <az...@gmail.com>
AuthorDate: Fri Jul 23 21:45:12 2021 +0400

    Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it
    
    patch by Aleksei Zoto; reviewed by Benjamin Lerer and Chris Lohfink for
    CASSANDRA-16806
---
 CHANGES.txt                                        |   3 +-
 doc/source/new/virtualtables.rst                   |  22 +-
 .../cassandra/cql3/statements/DeleteStatement.java |   4 +-
 .../cql3/statements/ModificationStatement.java     |   1 +
 .../cql3/statements/TruncateStatement.java         |  31 +-
 .../db/virtual/AbstractMutableVirtualTable.java    | 398 ++++++++++
 .../cassandra/db/virtual/AbstractVirtualTable.java |   8 +-
 .../apache/cassandra/db/virtual/VirtualTable.java  |   7 +-
 .../cql3/validation/entities/VirtualTableTest.java | 815 ++++++++++++++++++---
 9 files changed, 1174 insertions(+), 115 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6b95485..3fb4df5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806)
  * Include SASI components to snapshots (CASSANDRA-15134)
  * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
  * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
@@ -38,7 +39,7 @@ Merged from 3.0:
 
 4.0.1
  * Tolerate missing DNS entry when completing a host replacement (CASSANDRA-16873)
- * Harden PrunableArrayQueue against Pruner implementations that might throw exceptions (CASSANDRA-16866) 
+ * Harden PrunableArrayQueue against Pruner implementations that might throw exceptions (CASSANDRA-16866)
  * Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing (CASSANDRA-16721)
  * Bump zstd-jni version to 1.5.0-4 (CASSANDRA-16884)
  * Remove assumption that all urgent messages are small (CASSANDRA-16877)
diff --git a/doc/source/new/virtualtables.rst b/doc/source/new/virtualtables.rst
index 1c8766c..0cb988f 100644
--- a/doc/source/new/virtualtables.rst
+++ b/doc/source/new/virtualtables.rst
@@ -38,15 +38,15 @@ How  are Virtual Tables different from regular tables?
 
 Virtual tables and virtual keyspaces are quite different from regular tables and keyspaces respectively such as:
 
-- Virtual tables are read-only, but it is likely to change
+- Virtual tables support modifications only if the underlaying implementation allows it
 - Virtual tables are not replicated
 - Virtual tables are local only and non distributed
 - Virtual tables have no associated SSTables
 - Consistency level of the queries sent virtual tables are ignored
-- Virtual tables are managed by Cassandra and a user cannot run  DDL to create new virtual tables or DML to modify existing virtual       tables
+- Virtual tables are managed by Cassandra and a user cannot run DDL to create new virtual tables to modify existing virtual tables
 - Virtual tables are created in special keyspaces and not just any keyspace
-- All existing virtual tables use ``LocalPartitioner``. Since a virtual table is not replicated the partitioner sorts in order of     partition   keys instead of by their hash.
-- Making advanced queries with ``ALLOW FILTERING`` and aggregation functions may be used with virtual tables even though in normal  tables we   don't recommend it
+- All existing virtual tables use ``LocalPartitioner``. Since a virtual table is not replicated the partitioner sorts in order of partition keys instead of by their hash.
+- Making advanced queries with ``ALLOW FILTERING`` and aggregation functions may be used with virtual tables even though in normal tables we don't recommend it
 
 Virtual Keyspaces
 ^^^^^^^^^^^^^^^^^
@@ -66,21 +66,21 @@ Virtual Table Limitations
 
 Virtual tables and virtual keyspaces have some limitations initially though some of these could change such as:
 
-- Cannot alter or drop virtual keyspaces or tables
-- Cannot truncate virtual tables
 - Expiring columns are not supported by virtual tables
+- Custom timestamps are not supported by virtual tables
 - Conditional updates are not supported by virtual tables
-- Cannot create tables in virtual keyspaces
-- Cannot perform any operations against virtual keyspace
 - Secondary indexes are not supported on virtual tables
-- Cannot create functions in virtual keyspaces
-- Cannot create types in virtual keyspaces
 - Materialized views are not supported on virtual tables
-- Virtual tables don't support ``DELETE`` statements
+- Virtual tables support modifications only if the underlaying implementation allows it
 - Cannot ``CREATE TRIGGER`` against a virtual table
 - Conditional ``BATCH`` statements cannot include mutations for virtual tables
 - Cannot include a virtual table statement in a logged batch
 - Mutations for virtual and regular tables cannot exist in the same batch
+- Cannot alter or drop virtual keyspaces or tables
+- Cannot create functions in virtual keyspaces
+- Cannot create types in virtual keyspaces
+- Cannot create tables in virtual keyspaces
+- Cannot perform any operations against virtual keyspace
 - Cannot create aggregates in virtual keyspaces; but may run aggregate functions on select
 
 Listing and Describing Virtual Tables
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 1a92196..9ac29a0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -145,8 +145,6 @@ public class DeleteStatement extends ModificationStatement
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
-            checkFalse(metadata.isVirtual(), "Virtual tables don't support DELETE statements");
-
             Operations operations = new Operations(type);
 
             for (Operation.RawDeletion deletion : deletions)
@@ -177,6 +175,8 @@ public class DeleteStatement extends ModificationStatement
 
             if (stmt.hasConditions() && !restrictions.hasAllPKColumnsRestrictedByEqualities())
             {
+                checkFalse(stmt.isVirtual(), "DELETE statements must restrict all PRIMARY KEY columns with equality relations");
+
                 checkFalse(operations.appliesToRegularColumns(),
                            "DELETE statements must restrict all PRIMARY KEY columns with equality relations in order to delete non static columns");
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 087f3b0..4ff9928 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -266,6 +266,7 @@ public abstract class ModificationStatement implements CQLStatement
         checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
         checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
         checkFalse(isView(), "Cannot directly modify a materialized view");
+        checkFalse(isVirtual() && attrs.isTimestampSet(), "Custom timestamp is not supported by virtual tables");
         checkFalse(isVirtual() && attrs.isTimeToLiveSet(), "Expiring columns are not supported by virtual tables");
         checkFalse(isVirtual() && hasConditions(), "Conditional updates are not supported by virtual tables");
     }
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 206d116..2c1c994 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -25,8 +25,10 @@ import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -66,9 +68,13 @@ public class TruncateStatement extends QualifiedStatement implements CQLStatemen
                 throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
 
             if (metaData.isVirtual())
-                throw new InvalidRequestException("Cannot truncate virtual tables");
-
-            StorageProxy.truncateBlocking(keyspace(), name());
+            {
+                executeForVirtualTable(metaData.id);
+            }
+            else
+            {
+                StorageProxy.truncateBlocking(keyspace(), name());
+            }
         }
         catch (UnavailableException | TimeoutException e)
         {
@@ -86,10 +92,14 @@ public class TruncateStatement extends QualifiedStatement implements CQLStatemen
                 throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
 
             if (metaData.isVirtual())
-                throw new InvalidRequestException("Cannot truncate virtual tables");
-
-            ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(name());
-            cfs.truncateBlocking();
+            {
+                executeForVirtualTable(metaData.id);
+            }
+            else
+            {
+                ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(name());
+                cfs.truncateBlocking();
+            }
         }
         catch (Exception e)
         {
@@ -97,7 +107,12 @@ public class TruncateStatement extends QualifiedStatement implements CQLStatemen
         }
         return null;
     }
-    
+
+    private void executeForVirtualTable(TableId id)
+    {
+        VirtualKeyspaceRegistry.instance.getTableNullable(id).truncate();
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractMutableVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractMutableVirtualTable.java
new file mode 100644
index 0000000..c273e51
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractMutableVirtualTable.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source
+ * modification via INSERT/UPDATE, DELETE and TRUNCATE operations.
+ * 
+ * Virtual table implementation need to be thread-safe has they can be called from different threads.
+ */
+public abstract class AbstractMutableVirtualTable extends AbstractVirtualTable
+{
+
+    protected AbstractMutableVirtualTable(TableMetadata metadata)
+    {
+        super(metadata);
+    }
+
+    @Override
+    public final void apply(PartitionUpdate update)
+    {
+        ColumnValues partitionKey = ColumnValues.from(metadata(), update.partitionKey());
+
+        if (update.deletionInfo().isLive())
+            update.forEach(row ->
+            {
+                ColumnValues clusteringColumns = ColumnValues.from(metadata(), row.clustering());
+
+                if (row.deletion().isLive())
+                {
+                    if (row.columnCount() == 0)
+                    {
+                        applyColumnUpdate(partitionKey, clusteringColumns, Optional.empty());
+                    }
+                    else
+                    {
+                        row.forEach(columnData ->
+                        {
+                            checkFalse(columnData.column().isComplex(), "Complex type columns are not supported by table %s", metadata);
+
+                            Cell<?> cell = (Cell<?>) columnData;
+
+                            if (cell.isTombstone())
+                                applyColumnDeletion(partitionKey, clusteringColumns, columnName(cell));
+                            else
+                                applyColumnUpdate(partitionKey,
+                                        clusteringColumns,
+                                        Optional.of(ColumnValue.from(cell)));
+                        });
+                    }
+                }
+                else
+                    applyRowDeletion(partitionKey, clusteringColumns);
+            });
+        else
+        {
+            // MutableDeletionInfo may have partition delete or range tombstone list or both
+            if (update.deletionInfo().hasRanges())
+                update.deletionInfo()
+                        .rangeIterator(false)
+                        .forEachRemaining(rt -> applyRangeTombstone(partitionKey, toRange(rt.deletedSlice())));
+
+            if (!update.deletionInfo().getPartitionDeletion().isLive())
+                applyPartitionDeletion(partitionKey);
+        }
+    }
+
+    protected void applyPartitionDeletion(ColumnValues partitionKey)
+    {
+        throw invalidRequest("Partition deletion is not supported by table %s", metadata);
+    }
+
+    private Range<ColumnValues> toRange(Slice slice)
+    {
+        ClusteringBound<?> startBound = slice.start();
+        ClusteringBound<?> endBound = slice.end();
+
+        if (startBound.isBottom())
+        {
+            if (endBound.isTop())
+                return Range.all();
+
+            return Range.upTo(ColumnValues.from(metadata(), endBound), boundType(endBound));
+        }
+
+        if (endBound.isTop())
+            return Range.downTo(ColumnValues.from(metadata(), startBound), boundType(startBound));
+
+        ColumnValues start = ColumnValues.from(metadata(), startBound);
+        BoundType startType = boundType(startBound);
+
+        ColumnValues end = ColumnValues.from(metadata(), endBound);
+        BoundType endType = boundType(endBound);
+
+        return Range.range(start, startType, end, endType);
+    }
+
+    private static BoundType boundType(ClusteringBound<?> bound)
+    {
+        return bound.isInclusive() ? BoundType.CLOSED : BoundType.OPEN;
+    }
+
+    protected void applyRangeTombstone(ColumnValues partitionKey, Range<ColumnValues> range)
+    {
+        throw invalidRequest("Range deletion is not supported by table %s", metadata);
+    }
+
+    protected void applyRowDeletion(ColumnValues partitionKey, ColumnValues clusteringColumnValues)
+    {
+        throw invalidRequest("Row deletion is not supported by table %s", metadata);
+    }
+
+    protected void applyColumnDeletion(ColumnValues partitionKey, ColumnValues clusteringColumns, String columnName)
+    {
+        throw invalidRequest("Column deletion is not supported by table %s", metadata);
+    }
+
+    protected void applyColumnUpdate(ColumnValues partitionKey,
+                                     ColumnValues clusteringColumns,
+                                     Optional<ColumnValue> columnValue)
+    {
+        throw invalidRequest("Column modification is not supported by table %s", metadata);
+    }
+
+    private static String columnName(Cell<?> cell)
+    {
+        return cell.column().name.toCQLString();
+    }
+
+    /**
+     * A set of partition key or clustering column values.
+     */
+    public static final class ColumnValues implements Comparable<ColumnValues>
+    {
+        /**
+         * An empty set of column values.
+         */
+        private static final ColumnValues EMPTY = new ColumnValues(ImmutableList.of(), ArrayUtils.EMPTY_OBJECT_ARRAY);
+
+        /**
+         * The column metadata for the set of columns.
+         */
+        private final ImmutableList<ColumnMetadata> metadata;
+
+        /**
+         * The column values. The number of values can be smaller than the number of values if only
+         * a sub-set of the column values is specified (e.g. clustering prefix).
+         */
+        private final Object[] values;
+
+        /**
+         * Returns the set of column values corresponding to the specified partition key.
+         *
+         * @param metadata the table metadata
+         * @param partitionKey the partition key
+         * @return the set of columns values corresponding to the specified partition key
+         */
+        public static ColumnValues from(TableMetadata metadata, DecoratedKey partitionKey)
+        {
+            if (metadata.partitionKeyType instanceof CompositeType)
+            {
+                ByteBuffer[] buffers = ((CompositeType) metadata.partitionKeyType).split(partitionKey.getKey());
+                return ColumnValues.from(metadata.partitionKeyColumns(), buffers);
+            }
+
+            return ColumnValues.from(metadata.partitionKeyColumns(), partitionKey.getKey());
+        }
+
+        /**
+         * Returns the set of column values corresponding to the specified clustering prefix.
+         *
+         * @param metadata the table metadata
+         * @param prefix the clustering prefix
+         * @return the set of columns values corresponding to the specified clustering prefix
+         */
+        public static ColumnValues from(TableMetadata metadata, ClusteringPrefix<?> prefix)
+        {
+            if (prefix == Clustering.EMPTY)
+                return EMPTY;
+
+            return ColumnValues.from(metadata.clusteringColumns(), prefix.getBufferArray());
+        }
+
+        private static ColumnValues from(ImmutableList<ColumnMetadata> metadata, ByteBuffer... buffers)
+        {
+            return new ColumnValues(metadata, convert(metadata, buffers));
+        }
+
+        /**
+         * Create a {@code ColumnValues} for the specified set of columns.
+         *
+         * @param metadata the partition or clustering columns metadata
+         * @param values the partition or clustering column values
+         */
+        public ColumnValues(List<ColumnMetadata> metadata, Object... values)
+        {
+            this.metadata = ImmutableList.copyOf(metadata);
+            this.values = values;
+        }
+
+        /**
+         * Deserializes the column values.
+         *
+         * @param metadata the column metadata
+         * @param buffers the serialized column values
+         * @return the deserialized column values
+         */
+        private static Object[] convert(ImmutableList<ColumnMetadata> metadata, ByteBuffer[] buffers)
+        {
+            Object[] values = new Object[buffers.length];
+            for (int i = 0; i < buffers.length; i++)
+            {
+                values[i] = metadata.get(i).type.compose(buffers[i]);
+            }
+            return values;
+        }
+
+        /**
+         * Returns the name of the specified column
+         *
+         * @param i the column index
+         * @return the column name
+         */
+        public String name(int i)
+        {
+            Preconditions.checkPositionIndex(i, values.length);
+            return metadata.get(i).name.toCQLString();
+        }
+
+        /**
+         * Returns the value for the specified column
+         *
+         * @param i the column index
+         * @return the column value
+         */
+        @SuppressWarnings("unchecked")
+        public <V> V value(int i)
+        {
+            Preconditions.checkPositionIndex(i, values.length);
+            return (V) values[i];
+        }
+
+        /**
+         * Returns the number of column values.
+         *
+         * @return the number of column values.
+         */
+        public int size()
+        {
+            return values.length;
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.append('[');
+            for (int i = 0, m = metadata.size(); i <m; i++)
+            {
+                if (i != 0)
+                    builder.append(", ");
+
+                builder.append(metadata.get(i).name.toCQLString())
+                       .append(" : ");
+
+                if (i < values.length)
+                       builder.append(i < values.length ? values[i].toString() : "unspecified");
+            }
+            return builder.append(']').toString();
+        }
+
+        @Override
+        public int compareTo(ColumnValues o)
+        {
+            assert metadata.equals(o.metadata);
+
+            int s1 = size();
+            int s2 = o.size();
+            int minSize = Math.min(s1, s2);
+
+            for (int i = 0; i < minSize; i++)
+            {
+                int cmp = compare(values[i], o.values[i]);
+                if (cmp != 0)
+                    return cmp;
+            }
+
+            return 0;
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T extends Comparable<T>> int compare(Object c1, Object c2)
+        {
+            return ((T) c1).compareTo((T) c2);
+        }
+    }
+
+    /**
+     * A regular column value.
+     */
+    public static final class ColumnValue
+    {
+        /**
+         * The column metadata
+         */
+        private final ColumnMetadata metadata;
+
+        /**
+         * The column value
+         */
+        private final Object value;
+
+        /**
+         * Returns the column value corresponding to the specified cell.
+         *
+         * @param cell the column cell metadata
+         * @return the column value corresponding to the specified cell
+         */
+        public static ColumnValue from(Cell<?> cell)
+        {
+            ColumnMetadata metadata = cell.column();
+            return new ColumnValue(metadata, metadata.type.compose(cell.buffer()));
+        }
+
+        private ColumnValue(ColumnMetadata metadata, Object value)
+        {
+            this.metadata = metadata;
+            this.value = value;
+        }
+
+        /**
+         * Returns the column name.
+         *
+         * @return the column name
+         */
+        public String name()
+        {
+            return metadata.name.toCQLString();
+        }
+
+        /**
+         * Returns the column value.
+         *
+         * @return the column value
+         */
+        @SuppressWarnings("unchecked")
+        public <V> V value()
+        {
+            return (V) value;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s : %s", name(), value());
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index c2de1db..96fb7f9 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -48,7 +48,7 @@ public abstract class AbstractVirtualTable implements VirtualTable
     protected AbstractVirtualTable(TableMetadata metadata)
     {
         if (!metadata.isVirtual())
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException("Cannot instantiate a non-virtual table");
 
         this.metadata = metadata;
     }
@@ -127,6 +127,12 @@ public abstract class AbstractVirtualTable implements VirtualTable
         throw new InvalidRequestException("Modification is not supported by table " + metadata);
     }
 
+    @Override
+    public void truncate()
+    {
+        throw new InvalidRequestException("Truncation is not supported by table " + metadata);
+    }
+
     public interface DataSet
     {
         boolean isEmpty();
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index ea196ca..5373f4c 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -48,7 +48,7 @@ public interface VirtualTable
     TableMetadata metadata();
 
     /**
-     * Applies the specified update.
+     * Applies the specified update, if supported.
      * @param update the update to apply
      */
     void apply(PartitionUpdate update);
@@ -71,4 +71,9 @@ public interface VirtualTable
      * @return the rows corresponding to the requested data.
      */
     UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter);
+
+    /**
+     * Truncates data from the underlying source, if supported.
+     */
+    void truncate();
 }
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
index 9808c96..5d3b134 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
@@ -18,10 +18,23 @@
 package org.apache.cassandra.cql3.validation.entities;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.NavigableMap;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nonnull;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
+
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -31,18 +44,18 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.Partition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.AbstractMutableVirtualTable;
 import org.apache.cassandra.db.virtual.AbstractVirtualTable;
 import org.apache.cassandra.db.virtual.SimpleDataSet;
 import org.apache.cassandra.db.virtual.VirtualKeyspace;
 import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.db.virtual.VirtualTable;
-import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.triggers.ITrigger;
 
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -52,53 +65,152 @@ public class VirtualTableTest extends CQLTester
     private static final String VT1_NAME = "vt1";
     private static final String VT2_NAME = "vt2";
     private static final String VT3_NAME = "vt3";
+    private static final String VT4_NAME = "vt4";
 
-    private static class WritableVirtualTable extends AbstractVirtualTable
+    // As long as we execute test queries using execute (and not executeNet) the virtual tables implementation
+    // do not need to be thread-safe. We choose to do it to avoid issues if the test framework was changed or somebody
+    // decided to use the class with executeNet. It also provide a better example in case somebody is looking
+    // at the test for learning how to create mutable virtual tables
+    private static class MutableVirtualTable extends AbstractMutableVirtualTable
     {
-        private final ColumnMetadata valueColumn;
-        private final Map<String, Integer> backingMap = new HashMap<>();
+        // <pk1, pk2> -> c1 -> c2 -> <v1, v2>
+        private final Map<Pair<String, String>, NavigableMap<String, NavigableMap<String, Pair<Number, Number>>>> backingMap = new ConcurrentHashMap<>();
 
-        WritableVirtualTable(String keyspaceName, String tableName)
+        MutableVirtualTable(String keyspaceName, String tableName)
         {
             super(TableMetadata.builder(keyspaceName, tableName)
                                .kind(TableMetadata.Kind.VIRTUAL)
-                               .addPartitionKeyColumn("key", UTF8Type.instance)
-                               .addRegularColumn("value", Int32Type.instance)
+                               .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                               .addPartitionKeyColumn("pk2", UTF8Type.instance)
+                               .addClusteringColumn("c1", UTF8Type.instance)
+                               .addClusteringColumn("c2", UTF8Type.instance)
+                               .addRegularColumn("v1", Int32Type.instance)
+                               .addRegularColumn("v2", LongType.instance)
                                .build());
-            valueColumn = metadata().regularColumns().getSimple(0);
         }
 
         @Override
         public DataSet data()
         {
             SimpleDataSet data = new SimpleDataSet(metadata());
-            backingMap.forEach((key, value) -> data.row(key).column("value", value));
+            backingMap.forEach((pkPair, c1Map) ->
+                    c1Map.forEach((c1, c2Map) ->
+                    c2Map.forEach((c2, valuePair) -> data.row(pkPair.getLeft(), pkPair.getRight(), c1, c2)
+                                                         .column("v1", valuePair.getLeft())
+                                                         .column("v2", valuePair.getRight()))));
             return data;
         }
 
         @Override
-        public void apply(PartitionUpdate update)
+        protected void applyPartitionDeletion(ColumnValues partitionKeyColumns)
+        {
+            backingMap.remove(toPartitionKey(partitionKeyColumns));
+        }
+
+        @Override
+        protected void applyRangeTombstone(ColumnValues partitionKeyColumns, Range<ColumnValues> range)
+        {
+            Optional<NavigableMap<String, NavigableMap<String, Pair<Number, Number>>>> mayBePartition = getPartition(partitionKeyColumns);
+
+            if (!mayBePartition.isPresent())
+                return;
+
+            NavigableMap<String, NavigableMap<String, Pair<Number, Number>>> selection = mayBePartition.get();
+
+            for (String c1 : ImmutableList.copyOf(selection.keySet()))
+            {
+                NavigableMap<String, Pair<Number, Number>> rows = selection.get(c1);
+
+                for (String c2 : ImmutableList.copyOf(selection.get(c1).keySet()))
+                {
+                    if (range.contains(new ColumnValues(metadata().clusteringColumns(), c1, c2)))
+                        rows.remove(c2);
+                }
+
+                if (rows.isEmpty())
+                    selection.remove(c1);
+            }
+        }
+
+        @Override
+        protected void applyRowDeletion(ColumnValues partitionKeyColumns, ColumnValues clusteringColumns)
+        {
+            getRows(partitionKeyColumns, clusteringColumns.value(0)).ifPresent(rows -> rows.remove(clusteringColumns.value(1)));
+        }
+
+        @Override
+        protected void applyColumnDeletion(ColumnValues partitionKeyColumns, ColumnValues clusteringColumns, String columnName)
+        {
+            getRows(partitionKeyColumns, clusteringColumns.value(0)).ifPresent(rows -> rows.computeIfPresent(clusteringColumns.value(1),
+                                                                                                             (c, p) -> updateColumn(p, columnName, null)));
+        }
+
+        @Override
+        protected void applyColumnUpdate(ColumnValues partitionKeyColumns,
+                                         ColumnValues clusteringColumns,
+                                         Optional<ColumnValue> mayBeColumnValue)
         {
-            String key = (String) metadata().partitionKeyType.compose(update.partitionKey().getKey());
-            update.forEach(row ->
-                           {
-                               Integer value = Int32Type.instance.compose(row.getCell(valueColumn).buffer());
-                               backingMap.put(key, value);
-                           });
+            Pair<String, String> pkPair = toPartitionKey(partitionKeyColumns);
+            backingMap.computeIfAbsent(pkPair, ignored -> new ConcurrentSkipListMap<>())
+                      .computeIfAbsent(clusteringColumns.value(0), ignored -> new ConcurrentSkipListMap<>())
+                      .compute(clusteringColumns.value(1), (ignored, p) -> updateColumn(p, mayBeColumnValue));
+        }
+
+        @Override
+        public void truncate()
+        {
+            backingMap.clear();
+        }
+
+        private Optional<NavigableMap<String, Pair<Number, Number>>> getRows(ColumnValues partitionKeyColumns, Comparable<?> firstClusteringColumn)
+        {
+            return getPartition(partitionKeyColumns).map(p -> p.get(firstClusteringColumn));
+        }
+
+        private Optional<NavigableMap<String, NavigableMap<String, Pair<Number, Number>>>> getPartition(ColumnValues partitionKeyColumns)
+        {
+            Pair<String, String> pk = toPartitionKey(partitionKeyColumns);
+            return Optional.ofNullable(backingMap.get(pk));
+        }
+
+        private Pair<String, String> toPartitionKey(ColumnValues partitionKey)
+        {
+            return Pair.of(partitionKey.value(0), partitionKey.value(1));
+        }
+
+        private static Pair<Number, Number> updateColumn(@Nonnull Pair<Number, Number> row,
+                                                         String columnName,
+                                                         Number newValue)
+        {
+            return "v1".equals(columnName) ? Pair.of(newValue, row.getRight())
+                                           : Pair.of(row.getLeft(), newValue);
+        }
+
+        private static Pair<Number, Number> updateColumn(Pair<Number, Number> row,
+                                                         Optional<ColumnValue> mayBeColumnValue)
+        {
+            Pair<Number, Number> r = row != null ? row : Pair.of(null, null);
+
+            if (mayBeColumnValue.isPresent())
+            {
+                ColumnValue newValue = mayBeColumnValue.get();
+                return updateColumn(r, newValue.name(), newValue.value());
+            }
+
+            return r;
         }
     }
 
     @BeforeClass
     public static void setUpClass()
     {
-        TableMetadata vt1Metadata =
-        TableMetadata.builder(KS_NAME, VT1_NAME)
-                     .kind(TableMetadata.Kind.VIRTUAL)
-                     .addPartitionKeyColumn("pk", UTF8Type.instance)
-                     .addClusteringColumn("c", UTF8Type.instance)
-                     .addRegularColumn("v1", Int32Type.instance)
-                     .addRegularColumn("v2", LongType.instance)
-                     .build();
+        TableMetadata vt1Metadata = TableMetadata.builder(KS_NAME, VT1_NAME)
+                .kind(TableMetadata.Kind.VIRTUAL)
+                .addPartitionKeyColumn("pk", UTF8Type.instance)
+                .addClusteringColumn("c", UTF8Type.instance)
+                .addRegularColumn("v1", Int32Type.instance)
+                .addRegularColumn("v2", LongType.instance)
+                .build();
 
         SimpleDataSet vt1data = new SimpleDataSet(vt1Metadata);
 
@@ -116,18 +228,17 @@ public class VirtualTableTest extends CQLTester
                 return vt1data;
             }
         };
-        VirtualTable vt2 = new WritableVirtualTable(KS_NAME, VT2_NAME);
-
-        TableMetadata vt3Metadata =
-        TableMetadata.builder(KS_NAME, VT3_NAME)
-                     .kind(TableMetadata.Kind.VIRTUAL)
-                     .addPartitionKeyColumn("pk1", UTF8Type.instance)
-                     .addPartitionKeyColumn("pk2", UTF8Type.instance)
-                     .addClusteringColumn("ck1", UTF8Type.instance)
-                     .addClusteringColumn("ck2", UTF8Type.instance)
-                     .addRegularColumn("v1", Int32Type.instance)
-                     .addRegularColumn("v2", LongType.instance)
-                     .build();
+        VirtualTable vt2 = new MutableVirtualTable(KS_NAME, VT2_NAME);
+
+        TableMetadata vt3Metadata = TableMetadata.builder(KS_NAME, VT3_NAME)
+                .kind(TableMetadata.Kind.VIRTUAL)
+                .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                .addPartitionKeyColumn("pk2", UTF8Type.instance)
+                .addClusteringColumn("ck1", UTF8Type.instance)
+                .addClusteringColumn("ck2", UTF8Type.instance)
+                .addRegularColumn("v1", Int32Type.instance)
+                .addRegularColumn("v2", LongType.instance)
+                .build();
 
         SimpleDataSet vt3data = new SimpleDataSet(vt3Metadata);
 
@@ -141,13 +252,104 @@ public class VirtualTableTest extends CQLTester
                 return vt3data;
             }
         };
-        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2, vt3)));
+
+        TableMetadata vt4Metadata = TableMetadata.builder(KS_NAME, VT4_NAME)
+                .kind(TableMetadata.Kind.VIRTUAL)
+                .addPartitionKeyColumn("pk", UTF8Type.instance)
+                .addRegularColumn("v", LongType.instance)
+                .build();
+
+        // As long as we execute test queries using execute (and not executeNet) the virtual tables implementation
+        // do not need to be thread-safe. We choose to do it to avoid issues if the test framework was changed or somebody
+        // decided to use the class with executeNet. It also provide a better example in case somebody is looking
+        // at the test for learning how to create mutable virtual tables
+        VirtualTable vt4 = new AbstractMutableVirtualTable(vt4Metadata)
+        {
+            // CHM cannot be used here as they do not accept null values
+            private final AtomicReference<Map<String, Long>> table = new AtomicReference<Map<String, Long>>(Collections.emptyMap());
+
+            @Override
+            public DataSet data()
+            {
+                SimpleDataSet data = new SimpleDataSet(metadata());
+                table.get().forEach((pk, v) -> data.row(pk).column("v", v));
+                return data;
+            }
+
+            @Override
+            protected void applyPartitionDeletion(ColumnValues partitionKey)
+            {
+                Map<String, Long> oldMap;
+                Map<String, Long> newMap;
+                do
+                {
+                    oldMap = table.get();
+                    newMap = new HashMap<>(oldMap);
+                    newMap.remove(partitionKey.value(0));
+                }
+                while(!table.compareAndSet(oldMap, newMap));
+            }
+
+            @Override
+            protected void applyColumnDeletion(ColumnValues partitionKey,
+                                               ColumnValues clusteringColumns,
+                                               String columnName)
+            {
+                Map<String, Long> oldMap;
+                Map<String, Long> newMap;
+                do
+                {
+                    oldMap = table.get();
+
+                    if (!oldMap.containsKey(partitionKey.value(0)))
+                        break;
+
+                    newMap = new HashMap<>(oldMap);
+                    newMap.put(partitionKey.value(0), null);
+                }
+                while(!table.compareAndSet(oldMap, newMap));
+            }
+
+            @Override
+            protected void applyColumnUpdate(ColumnValues partitionKey,
+                                             ColumnValues clusteringColumns,
+                                             Optional<ColumnValue> columnValue)
+            {
+                Map<String, Long> oldMap;
+                Map<String, Long> newMap;
+                do
+                {
+                    oldMap = table.get();
+                    if (oldMap.containsKey(partitionKey.value(0)) && !columnValue.isPresent())
+                        break;
+                    newMap = new HashMap<>(oldMap);
+                    newMap.put(partitionKey.value(0), columnValue.isPresent() ? columnValue.get().value() : null);
+                }
+                while(!table.compareAndSet(oldMap, newMap));
+            }
+
+            @Override
+            public void truncate()
+            {
+                Map<String, Long> oldMap;
+                do
+                {
+                    oldMap = table.get();
+                    if (oldMap.isEmpty())
+                        break;
+                }
+                while(!table.compareAndSet(oldMap, Collections.emptyMap()));
+            }
+
+        };
+
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2, vt3, vt4)));
 
         CQLTester.setUpClass();
     }
 
     @Test
-    public void testQueries() throws Throwable
+    public void testReadOperationsOnReadOnlyTable() throws Throwable
     {
         assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'UNKNOWN'"));
 
@@ -232,7 +434,7 @@ public class VirtualTableTest extends CQLTester
     }
 
     @Test
-    public void testQueriesOnTableWithMultiplePks() throws Throwable
+    public void testReadOperationsOnReadOnlyTableWithMultiplePks() throws Throwable
     {
         assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt3 WHERE pk1 = 'UNKNOWN' AND pk2 = 'UNKNOWN'"));
 
@@ -249,76 +451,510 @@ public class VirtualTableTest extends CQLTester
     }
 
     @Test
-    public void testModifications() throws Throwable
+    public void testDMLOperationsOnMutableCompositeTable() throws Throwable
+    {
+        // check for a clean state
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+
+        // fill the table, test UNLOGGED batch
+        execute("BEGIN UNLOGGED BATCH " +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  1, v2 =  1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  2, v2 =  2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  3, v2 =  3 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  4, v2 =  4 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  5, v2 =  5 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  6, v2 =  6 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  7, v2 =  7 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  8, v2 =  8 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  9, v2 =  9 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 10, v2 = 10 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_2';" +
+                "APPLY BATCH");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // update a single column with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 11 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 11, 1L));
+
+        // update multiple columns with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 111, v2 = 111 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 111, 111L));
+
+        // update a single columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 22)");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 22L));
+
+        // update multiple columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 222, 222)");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 222, 222L));
+
+        // delete a single partition
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // delete a first-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 <= 'c1_1'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // delete a first-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 > 'c1_1' AND c1 < 'c1_3'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // delete multiple rows
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a second-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 > 'c2_5'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L));
+
+        // delete a second-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 >= 'c2_3' AND c2 < 'c2_5'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L));
+
+        // delete a single row
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L));
+
+        // delete a single column
+        execute("DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", null, 3L));
+
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
+
+    @Test
+    public void testRangeDeletionWithMulticolumnRestrictionsOnMutableTable() throws Throwable
+    {
+        // check for a clean state
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+
+        // fill the table, test UNLOGGED batch
+        execute("BEGIN UNLOGGED BATCH " +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  1, v2 =  1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  2, v2 =  2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  3, v2 =  3 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  4, v2 =  4 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  5, v2 =  5 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  6, v2 =  6 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  7, v2 =  7 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  8, v2 =  8 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  9, v2 =  9 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 10, v2 = 10 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 11, v2 = 11 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 12, v2 = 12 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_4';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 13, v2 = 13 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 14, v2 = 14 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_3' AND c2 = 'c2_1';" +
+                "APPLY BATCH");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_4", 12, 12L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_5", 13, 13L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns equality
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND (c1, c2) = ('c1_1', 'c2_5')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_4", 12, 12L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_5", 13, 13L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns with slice on both side of different length
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 >= 'c1_1' AND (c1, c2) <= ('c1_1', 'c2_5')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_4", 12, 12L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_5", 13, 13L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND (c1, c2) > ('c1_2', 'c2_3') AND (c1) < ('c1_3')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns with slice on both side of different length
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 >= 'c1_1' AND (c1, c2) <= ('c1_1', 'c2_5')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns with only top slice
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND (c1, c2) < ('c1_2', 'c2_2')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns with only bottom slice
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND (c1, c2) > ('c1_1', 'c2_1')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L),
+                row("pk1_1", "pk2_3", "c1_3", "c2_1", 14, 14L));
+
+        // Test deletion with multiple columns IN
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND (c1, c2) IN (('c1_2', 'c2_2'), ('c1_3', 'c2_1'), ('c1_4', 'c2_1'))");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_3", 11, 11L));
+
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
+
+    @Test
+    public void testDMLOperationsOnMutableNonCompositeTable() throws Throwable
+    {
+        // check for a clean state
+        execute("TRUNCATE test_virtual_ks.vt4");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt4"));
+
+        // fill the table, test UNLOGGED batch
+        execute("BEGIN UNLOGGED BATCH " +
+                "INSERT INTO test_virtual_ks.vt4 (pk, v) VALUES ('pk1', 1);" +
+                "INSERT INTO test_virtual_ks.vt4 (pk, v) VALUES ('pk2', 2);" +
+                "INSERT INTO test_virtual_ks.vt4 (pk, v) VALUES ('pk3', 3);" +
+                "INSERT INTO test_virtual_ks.vt4 (pk, v) VALUES ('pk4', 4);" +
+                "APPLY BATCH");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                row("pk1", 1L),
+                row("pk2", 2L),
+                row("pk3", 3L),
+                row("pk4", 4L));
+
+         execute("UPDATE test_virtual_ks.vt4 SET v = 3 WHERE pk = 'pk1'");
+         assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                 row("pk1", 3L),
+                 row("pk2", 2L),
+                 row("pk3", 3L),
+                 row("pk4", 4L));
+
+        // update a single columns with INSERT
+         execute("INSERT INTO test_virtual_ks.vt4 (pk, v) VALUES ('pk1', 1);");
+         assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                 row("pk1", 1L),
+                 row("pk2", 2L),
+                 row("pk3", 3L),
+                 row("pk4", 4L));
+
+         // update no column via INSERT
+         execute("INSERT INTO test_virtual_ks.vt4 (pk) VALUES ('pk1');");
+         assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                 row("pk1", 1L),
+                 row("pk2", 2L),
+                 row("pk3", 3L),
+                 row("pk4", 4L));
+
+         // insert new primary key only
+         execute("INSERT INTO test_virtual_ks.vt4 (pk) VALUES ('pk5');");
+         assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                 row("pk1", 1L),
+                 row("pk2", 2L),
+                 row("pk3", 3L),
+                 row("pk4", 4L),
+                 row("pk5", null));
+
+        // delete a single partition
+        execute("DELETE FROM test_virtual_ks.vt4 WHERE pk = 'pk2'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                row("pk1", 1L),
+                row("pk3", 3L),
+                row("pk4", 4L),
+                row("pk5", null));
+
+        // delete a single column
+        execute("DELETE v FROM test_virtual_ks.vt4 WHERE pk = 'pk4'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt4"),
+                row("pk1", 1L),
+                row("pk3", 3L),
+                row("pk4", null),
+                row("pk5", null));
+
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt4");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt4"));
+    }
+
+    @Test
+    public void testInsertRowWithoutRegularColumnsOperationOnMutableTable() throws Throwable
+    {
+        // check for a clean state
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+
+        // insert a primary key without columns
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", null, null));
+
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
+
+    @Test
+    public void testDeleteWithInOperationsOnMutableTable() throws Throwable
     {
-        // check for clean state
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
+        // check for a clean state
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
 
         // fill the table, test UNLOGGED batch
         execute("BEGIN UNLOGGED BATCH " +
-                "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  1, v2 =  1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  2, v2 =  2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  3, v2 =  3 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  4, v2 =  4 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_2' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  5, v2 =  5 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  6, v2 =  6 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_2' AND c2 = 'c2_6';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  7, v2 =  7 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  8, v2 =  8 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 =  9, v2 =  9 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 10, v2 = 10 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 = 'c1_2' AND c2 = 'c2_2';" +
                 "APPLY BATCH");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
-                   row("pk1", 1),
-                   row("pk2", 2),
-                   row("pk3", 3));
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                row("pk1_1", "pk2_2", "c1_1", "c2_2", 2, 2L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_2", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_1", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // delete multiple partitions with IN
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 IN('pk2_1', 'pk2_2')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_2", 8, 8L),
+                row("pk1_1", "pk2_3", "c1_1", "c2_1", 9, 9L),
+                row("pk1_1", "pk2_3", "c1_2", "c2_2", 10, 10L));
+
+        // delete multiple rows via first-level IN
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_3' AND c1 IN('c1_1', 'c1_2')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_2", 8, 8L));
+
+        // delete multiple rows via second-level IN
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 IN('c2_1', 'c2_2')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", 6, 6L));
+
+        // delete multiple rows with first-level IN and second-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 IN('c1_1', 'c1_2') AND c2 <= 'c2_3'");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", 6, 6L));
+
+        // delete multiple rows via first-level and second-level IN
+        execute("DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 IN('c1_1', 'c1_2') AND c2 IN('c2_5', 'c2_6')");
+        assertRowsIgnoringOrder(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", null, 5L),
+                row("pk1_2", "pk2_1", "c1_2", "c2_6", null, 6L));
+
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
 
-        // test that LOGGED batches don't allow virtual table updates
+    @Test
+    public void testInvalidDMLOperationsOnMutableTable() throws Throwable
+    {
+        // test that LOGGED batch doesn't allow virtual table updates
         assertInvalidMessage("Cannot include a virtual table statement in a logged batch",
-                             "BEGIN BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
-                             "APPLY BATCH");
+                "BEGIN BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
 
         // test that UNLOGGED batch doesn't allow mixing updates for regular and virtual tables
-        createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
+        createTable("CREATE TABLE %s (pk1 text, pk2 text, c1 text, c2 text, v1 int, v2 bigint, PRIMARY KEY ((pk1, pk2), c1, c2))");
         assertInvalidMessage("Mutations for virtual and regular tables cannot exist in the same batch",
-                             "BEGIN UNLOGGED BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1'" +
-                             "UPDATE %s                  SET value = 2 WHERE key ='pk2'" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3'" +
-                             "APPLY BATCH");
-
-        // update a single value with UPDATE
-        execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk1'"),
-                   row("pk1", 11));
-
-        // update a single value with INSERT
-        executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22)");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk2'"),
-                   row("pk2", 22));
-
-        // test that deletions are (currently) rejected
-        assertInvalidMessage("Virtual tables don't support DELETE statements",
-                             "DELETE FROM test_virtual_ks.vt2 WHERE key ='pk1'");
+                "BEGIN UNLOGGED BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE %s                  SET v1 = 2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
+
+        // test that TIMESTAMP is (currently) rejected with INSERT and UPDATE
+        assertInvalidMessage("Custom timestamp is not supported by virtual tables",
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) VALUES ('pk1', 'pk2', 'c1', 'c2', 1, 11) USING TIMESTAMP 123456789");
+        assertInvalidMessage("Custom timestamp is not supported by virtual tables",
+                "UPDATE test_virtual_ks.vt2 USING TIMESTAMP 123456789 SET v1 = 1, v2 = 11 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2'");
 
         // test that TTL is (currently) rejected with INSERT and UPDATE
         assertInvalidMessage("Expiring columns are not supported by virtual tables",
-                             "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk1', 11) USING TTL 86400");
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) VALUES ('pk1', 'pk2', 'c1', 'c2', 1, 11) USING TTL 86400");
         assertInvalidMessage("Expiring columns are not supported by virtual tables",
-                             "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET value = 11 WHERE key ='pk1'");
+                "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET v1 = 1, v2 = 11 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2'");
 
-        // test that LWT is (currently) rejected with virtual tables in batches
+        // test that LWT is (currently) rejected with BATCH
         assertInvalidMessage("Conditional BATCH statements cannot include mutations for virtual tables",
-                             "BEGIN UNLOGGED BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2;" +
-                             "APPLY BATCH");
+                "BEGIN UNLOGGED BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2' IF v1 = 2;" +
+                        "APPLY BATCH");
 
-        // test that LWT is (currently) rejected with virtual tables in UPDATEs
+        // test that LWT is (currently) rejected with INSERT and UPDATE
         assertInvalidMessage("Conditional updates are not supported by virtual tables",
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2");
-
-        // test that LWT is (currently) rejected with virtual tables in INSERTs
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1) VALUES ('pk1', 'pk2', 'c1', 'c2', 2) IF NOT EXISTS");
         assertInvalidMessage("Conditional updates are not supported by virtual tables",
-                             "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22) IF NOT EXISTS");
+                "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2' IF v1 = 2");
+
+        // test that row DELETE without full primary key with equality relation is (currently) rejected
+        assertInvalidMessage("Some partition key parts are missing: pk2",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND c1 = 'c1' AND c2 > 'c2'");
+        assertInvalidMessage("Only EQ and IN relation are supported on the partition key (unless you use the token() function) for DELETE statements",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 > 'pk2' AND c1 = 'c1' AND c2 > 'c2'");
+        assertInvalidMessage("KEY column \"c2\" cannot be restricted as preceding column \"c1\" is not restricted",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c2 > 'c2'");
+        assertInvalidMessage("Clustering column \"c2\" cannot be restricted (preceding column \"c1\" is restricted by a non-EQ relation)",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 > 'c1' AND c2 > 'c2'");
+        assertInvalidMessage("DELETE statements must restrict all PRIMARY KEY columns with equality relations",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 > 'c2' IF v1 = 2");
+
+        // test that column DELETE without full primary key with equality relation is (currently) rejected
+        assertInvalidMessage("Range deletions are not supported for specific columns",
+                "DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1'");
+        assertInvalidMessage("Range deletions are not supported for specific columns",
+                "DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 > 'c2'");
+    }
+
+    @Test
+    public void testInvalidDMLOperationsOnReadOnlyTable() throws Throwable
+    {
+        assertInvalidMessage("Modification is not supported by table test_virtual_ks.vt1",
+                "INSERT INTO test_virtual_ks.vt1 (pk, c, v1, v2) VALUES ('pk1_1', 'ck1_1', 11, 11)");
+
+        assertInvalidMessage("Modification is not supported by table test_virtual_ks.vt1",
+                "UPDATE test_virtual_ks.vt1 SET v1 = 11, v2 = 11 WHERE pk = 'pk1_1' AND c = 'ck1_1'");
+
+        assertInvalidMessage("Modification is not supported by table test_virtual_ks.vt1",
+                "DELETE FROM test_virtual_ks.vt1 WHERE pk = 'pk1_1' AND c = 'ck1_1'");
+
+        assertInvalidMessage("Error during truncate: Truncation is not supported by table test_virtual_ks.vt1",
+                "TRUNCATE TABLE test_virtual_ks.vt1");
     }
 
     @Test
-    public void testInvalidDDLOperations() throws Throwable
+    public void testInvalidDDLOperationsOnVirtualKeyspaceAndReadOnlyTable() throws Throwable
     {
         assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
                              "DROP KEYSPACE test_virtual_ks");
@@ -338,9 +974,6 @@ public class VirtualTableTest extends CQLTester
         assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
                              "ALTER TABLE test_virtual_ks.vt1 DROP v1");
 
-        assertInvalidMessage("Error during truncate: Cannot truncate virtual tables",
-                             "TRUNCATE TABLE test_virtual_ks.vt1");
-
         assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
                              "CREATE INDEX ON test_virtual_ks.vt1 (v1)");
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org