You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2017/09/25 07:41:37 UTC

[13/13] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4734ce7d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4734ce7d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4734ce7d

Branch: refs/heads/cassandra-3.11
Commit: 4734ce7d94945126e40ec14c8eb2c305e768c6d1
Parents: 3e3d56e ce8c9b5
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 25 09:31:10 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Mon Sep 25 09:31:10 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/config/CFMetaData.java | 199 +++-
 .../apache/cassandra/cql3/AbstractMarker.java   |   9 +-
 .../apache/cassandra/cql3/ColumnCondition.java  |   6 +
 .../apache/cassandra/cql3/ColumnConditions.java |   6 +
 .../org/apache/cassandra/cql3/Constants.java    |   3 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  85 ++
 .../cassandra/cql3/MultiColumnRelation.java     |  64 +-
 .../org/apache/cassandra/cql3/Operation.java    |  81 ++
 .../org/apache/cassandra/cql3/Relation.java     |   9 +
 .../cassandra/cql3/SingleColumnRelation.java    |  76 +-
 .../cql3/SuperColumnCompatibility.java          | 763 +++++++++++++++
 .../apache/cassandra/cql3/UpdateParameters.java |   9 +-
 .../org/apache/cassandra/cql3/WhereClause.java  |  13 +-
 .../ClusteringColumnRestrictions.java           |   4 +
 .../restrictions/SingleColumnRestriction.java   | 207 +++-
 .../restrictions/StatementRestrictions.java     |  27 +-
 .../cassandra/cql3/restrictions/TermSlice.java  |   2 +-
 .../cql3/statements/CreateViewStatement.java    |   2 +
 .../cql3/statements/DeleteStatement.java        |  30 +-
 .../cql3/statements/ModificationStatement.java  |  15 +-
 .../cql3/statements/SelectStatement.java        |  56 +-
 .../cql3/statements/UpdateStatement.java        |  88 +-
 src/java/org/apache/cassandra/db/Columns.java   |   1 +
 .../org/apache/cassandra/db/CompactTables.java  |  46 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   3 +-
 .../cassandra/db/SerializationHeader.java       |   1 +
 .../cassandra/schema/LegacySchemaMigrator.java  |  39 +-
 .../cassandra/thrift/CassandraServer.java       |   7 +-
 .../cassandra/thrift/ThriftConversion.java      |  14 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  19 +-
 .../org/apache/cassandra/cql3/ViewTest.java     |  29 +
 .../cql3/validation/ThriftIntegrationTest.java  | 942 +++++++++++++++++++
 .../validation/operations/ThriftCQLTester.java  |  90 ++
 .../db/ColumnFamilyStoreCQLHelperTest.java      |  12 +-
 .../schema/LegacySchemaMigratorTest.java        |  13 +-
 36 files changed, 2772 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8fc72fc,7745e8c..06d62b8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,7 +1,18 @@@
 -3.0.15
 - * Fix support for SuperColumn tables (CASSANDRA-12373)
 +3.11.1
++=======
   * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 - * Fix missing original update in TriggerExecutor (CASSANDRA-13894)
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
++ * Fix support for SuperColumn tables (CASSANDRA-12373)
   * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
   * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 14a605b,fd1c9e5..77cad1a
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -40,9 -40,9 +41,10 @@@ import org.apache.commons.lang3.builder
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.auth.DataResource;
  import org.apache.cassandra.cql3.ColumnIdentifier;
  import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.SuperColumnCompatibility;
  import org.apache.cassandra.cql3.statements.CFStatement;
  import org.apache.cassandra.cql3.statements.CreateTableStatement;
  import org.apache.cassandra.db.*;
@@@ -120,11 -117,33 +122,38 @@@ public final class CFMetaDat
      // for those tables in practice).
      private volatile ColumnDefinition compactValueColumn;
  
 +    public final DataResource resource;
 +
 +    //For hot path serialization it's often easier to store this info here
 +    private volatile ColumnFilter allColumnFilter;
 +
+     /**
+      * These two columns are "virtual" (e.g. not persisted together with schema).
+      *
+      * They are stored here to avoid re-creating during SELECT and UPDATE queries, where
+      * they are used to allow presenting supercolumn families in the CQL-compatible
+      * format. See {@link SuperColumnCompatibility} for more details.
+      **/
+     private volatile ColumnDefinition superCfKeyColumn;
+     private volatile ColumnDefinition superCfValueColumn;
+ 
+     public boolean isSuperColumnKeyColumn(ColumnDefinition cd)
+     {
+         return cd.name.equals(superCfKeyColumn.name);
+     }
+ 
+     public boolean isSuperColumnValueColumn(ColumnDefinition cd)
+     {
+         return cd.name.equals(superCfValueColumn.name);
+     }
+ 
+     public ColumnDefinition superColumnValueColumn()
+     {
+         return superCfValueColumn;
+     }
+ 
+     public ColumnDefinition superColumnKeyColumn() { return superCfKeyColumn; }
+ 
      /*
       * All of these methods will go away once CFMetaData becomes completely immutable.
       */
@@@ -297,10 -316,14 +329,15 @@@
          this.clusteringColumns = clusteringColumns;
          this.partitionColumns = partitionColumns;
  
+         this.superCfKeyColumn = superCfKeyColumn;
+         this.superCfValueColumn = superCfValueColumn;
+ 
+         //This needs to happen before serializers are set
+         //because they use comparator.subtypes()
          rebuild();
  
-         this.resource = DataResource.table(ksName, cfName);
          this.serializers = new Serializers(this);
++        this.resource = DataResource.table(ksName, cfName);
      }
  
      // This rebuild informations that are intrinsically duplicate of the table definition but
@@@ -323,10 -370,10 +384,12 @@@
          List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns);
          this.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
  
-         if (isCompactTable())
-             this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper());
+         if (isSuper())
+             this.comparator = new ClusteringComparator(clusteringColumns.get(0).type);
+         else
+             this.comparator = new ClusteringComparator(extractTypes(clusteringColumns));
 +
 +        this.allColumnFilter = ColumnFilter.all(this);
      }
  
      public Indexes getIndexes()
@@@ -384,12 -426,14 +447,14 @@@
                                partitions,
                                clusterings,
                                builder.build(),
-                               partitioner);
+                               partitioner,
+                               null,
+                               null);
      }
  
 -    private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns)
 +    public static List<AbstractType<?>> extractTypes(Iterable<ColumnDefinition> clusteringColumns)
      {
 -        List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size());
 +        List<AbstractType<?>> types = new ArrayList<>();
          for (ColumnDefinition def : clusteringColumns)
              types.add(def.type);
          return types;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/AbstractMarker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
index 4ddfabb,1bfac3f..fb86e7b
--- a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
@@@ -248,4 -239,65 +249,65 @@@ public class MultiColumnRelation extend
                        .append(valuesOrMarker)
                        .toString();
      }
+ 
+     @Override
+     public Relation toSuperColumnAdapter()
+     {
+         return new SuperColumnMultiColumnRelation(entities, relationType, valuesOrMarker, inValues, inMarker);
+     }
+ 
+     /**
+      * Required for SuperColumn compatibility, in order to map the SuperColumn key restrictions from the regular
+      * column to the collection key one.
+      */
+     private class SuperColumnMultiColumnRelation extends MultiColumnRelation
+     {
 -        private SuperColumnMultiColumnRelation(List<ColumnIdentifier.Raw> entities, Operator relationType, MultiColumnRaw valuesOrMarker, List<? extends MultiColumnRaw> inValues, Tuples.INRaw inMarker)
++        private SuperColumnMultiColumnRelation(List<ColumnDefinition.Raw> entities, Operator relationType, MultiColumnRaw valuesOrMarker, List<? extends MultiColumnRaw> inValues, Tuples.INRaw inMarker)
+         {
+             super(entities, relationType, valuesOrMarker, inValues, inMarker);
+         }
+ 
+         @Override
+         protected Restriction newSliceRestriction(CFMetaData cfm,
+                                                   VariableSpecifications boundNames,
+                                                   Bound bound,
+                                                   boolean inclusive) throws InvalidRequestException
+         {
+             assert cfm.isSuper() && cfm.isDense();
+             List<ColumnDefinition> receivers = receivers(cfm);
+             Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+             return new SingleColumnRestriction.SuperColumnMultiSliceRestriction(receivers.get(0), bound, inclusive, term);
+         }
+ 
+         @Override
+         protected Restriction newEQRestriction(CFMetaData cfm,
+                                                VariableSpecifications boundNames) throws InvalidRequestException
+         {
+             assert cfm.isSuper() && cfm.isDense();
+             List<ColumnDefinition> receivers = receivers(cfm);
+             Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+             return new SingleColumnRestriction.SuperColumnMultiEQRestriction(receivers.get(0), term);
+         }
+ 
+         @Override
+         protected List<ColumnDefinition> receivers(CFMetaData cfm) throws InvalidRequestException
+         {
+             assert cfm.isSuper() && cfm.isDense();
+             List<ColumnDefinition> names = new ArrayList<>(getEntities().size());
+ 
 -            for (ColumnIdentifier.Raw raw : getEntities())
++            for (ColumnDefinition.Raw raw : getEntities())
+             {
 -                ColumnDefinition def = toColumnDefinition(cfm, raw);
++                ColumnDefinition def = raw.prepare(cfm);
+ 
+                 checkTrue(def.isClusteringColumn() ||
+                           cfm.isSuperColumnKeyColumn(def),
+                           "Multi-column relations can only be applied to clustering columns but was applied to: %s", def.name);
+ 
+                 checkFalse(names.contains(def), "Column \"%s\" appeared twice in a relation: %s", def.name, this);
+ 
+                 names.add(def);
+             }
+             return names;
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/Operation.java
index 85a1eb2,4b8d5ba..c005701
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@@ -246,46 -246,72 +251,112 @@@ public abstract class Operatio
          }
      }
  
 +    public static class SetField implements RawUpdate
 +    {
 +        private final FieldIdentifier field;
 +        private final Term.Raw value;
 +
 +        public SetField(FieldIdentifier field, Term.Raw value)
 +        {
 +            this.field = field;
 +            this.value = value;
 +        }
 +
 +        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) throws InvalidRequestException
 +        {
 +            if (!receiver.type.isUDT())
 +                throw new InvalidRequestException(String.format("Invalid operation (%s) for non-UDT column %s", toString(receiver), receiver.name));
 +            else if (!receiver.type.isMultiCell())
 +                throw new InvalidRequestException(String.format("Invalid operation (%s) for frozen UDT column %s", toString(receiver), receiver.name));
 +
 +            int fieldPosition = ((UserType) receiver.type).fieldPosition(field);
 +            if (fieldPosition == -1)
 +                throw new InvalidRequestException(String.format("UDT column %s does not have a field named %s", receiver.name, field));
 +
 +            Term val = value.prepare(cfm.ksName, UserTypes.fieldSpecOf(receiver, fieldPosition));
 +            return new UserTypes.SetterByField(receiver, field, val);
 +        }
 +
 +        protected String toString(ColumnSpecification column)
 +        {
 +            return String.format("%s.%s = %s", column.name, field, value);
 +        }
 +
 +        public boolean isCompatibleWith(RawUpdate other)
 +        {
 +            if (other instanceof SetField)
 +                return !((SetField) other).field.equals(field);
 +            else
 +                return !(other instanceof SetValue);
 +        }
 +    }
 +
+     // Currently only used internally counters support in SuperColumn families.
+     // Addition on the element level inside the collections are otherwise not supported in the CQL.
+     public static class ElementAddition implements RawUpdate
+     {
+         private final Term.Raw selector;
+         private final Term.Raw value;
+ 
+         public ElementAddition(Term.Raw selector, Term.Raw value)
+         {
+             this.selector = selector;
+             this.value = value;
+         }
+ 
 -        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
++        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) throws InvalidRequestException
+         {
+             assert receiver.type instanceof MapType;
 -            Term k = selector.prepare(keyspace, Maps.keySpecOf(receiver));
 -            Term v = value.prepare(keyspace, Maps.valueSpecOf(receiver));
++            Term k = selector.prepare(cfm.ksName, Maps.keySpecOf(receiver));
++            Term v = value.prepare(cfm.ksName, Maps.valueSpecOf(receiver));
+ 
+             return new Maps.AdderByKey(receiver, v, k);
+         }
+ 
+         protected String toString(ColumnSpecification column)
+         {
+             return String.format("%s = %s + %s", column.name, column.name, value);
+         }
+ 
+         public boolean isCompatibleWith(RawUpdate other)
+         {
+             return !(other instanceof SetValue);
+         }
+     }
+ 
+     // Currently only used internally counters support in SuperColumn families.
+     // Addition on the element level inside the collections are otherwise not supported in the CQL.
+     public static class ElementSubtraction implements RawUpdate
+     {
+         private final Term.Raw selector;
+         private final Term.Raw value;
+ 
+         public  ElementSubtraction(Term.Raw selector, Term.Raw value)
+         {
+             this.selector = selector;
+             this.value = value;
+         }
+ 
 -        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
++        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) throws InvalidRequestException
+         {
+             assert receiver.type instanceof MapType;
 -            Term k = selector.prepare(keyspace, Maps.keySpecOf(receiver));
 -            Term v = value.prepare(keyspace, Maps.valueSpecOf(receiver));
++            Term k = selector.prepare(cfm.ksName, Maps.keySpecOf(receiver));
++            Term v = value.prepare(cfm.ksName, Maps.valueSpecOf(receiver));
+ 
+             return new Maps.SubtracterByKey(receiver, v, k);
+         }
+ 
+         protected String toString(ColumnSpecification column)
+         {
+             return String.format("%s = %s + %s", column.name, column.name, value);
+         }
+ 
+         public boolean isCompatibleWith(RawUpdate other)
+         {
+             return !(other instanceof SetValue);
+         }
+     }
+ 
      public static class Addition implements RawUpdate
      {
          private final Term.Raw value;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
index e0ee519,455ae0c..2e9b41f
--- a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
@@@ -38,12 -37,12 +38,12 @@@ import static org.apache.cassandra.cql3
  
  /**
   * Relations encapsulate the relationship between an entity of some kind, and
 - * a value (term). For example, <key> > "start" or "colname1" = "somevalue".
 + * a value (term). For example, {@code <key> > "start" or "colname1" = "somevalue"}.
   *
   */
- public final class SingleColumnRelation extends Relation
+ public class SingleColumnRelation extends Relation
  {
 -    private final ColumnIdentifier.Raw entity;
 +    private final ColumnDefinition.Raw entity;
      private final Term.Raw mapKey;
      private final Term.Raw value;
      private final List<Term.Raw> inValues;
@@@ -321,6 -301,80 +321,80 @@@
  
      private boolean canHaveOnlyOneValue()
      {
 -        return isEQ() || (isIN() && inValues != null && inValues.size() == 1);
 +        return isEQ() || isLIKE() || (isIN() && inValues != null && inValues.size() == 1);
      }
+ 
+     @Override
+     public Relation toSuperColumnAdapter()
+     {
+         return new SuperColumnSingleColumnRelation(entity, mapKey, relationType, value);
+     }
+ 
+     /**
+      * Required for SuperColumn compatibility, in order to map the SuperColumn key restrictions from the regular
+      * column to the collection key one.
+      */
+     private class SuperColumnSingleColumnRelation extends SingleColumnRelation
+     {
 -        SuperColumnSingleColumnRelation(ColumnIdentifier.Raw entity, Raw mapKey, Operator type, Raw value)
++        SuperColumnSingleColumnRelation(ColumnDefinition.Raw entity, Raw mapKey, Operator type, Raw value)
+         {
+             super(entity, mapKey, type, value, inValues);
+         }
+ 
+         @Override
+         public Restriction newSliceRestriction(CFMetaData cfm,
+                                                VariableSpecifications boundNames,
+                                                Bound bound,
+                                                boolean inclusive) throws InvalidRequestException
+         {
 -            ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
++            ColumnDefinition columnDef = entity.prepare(cfm);
+             if (cfm.isSuperColumnKeyColumn(columnDef))
+             {
 -                Term term = toTerm(toReceivers(columnDef, cfm.isDense()), value, cfm.ksName, boundNames);
++                Term term = toTerm(toReceivers(columnDef), value, cfm.ksName, boundNames);
+                 return new SingleColumnRestriction.SuperColumnKeySliceRestriction(cfm.superColumnKeyColumn(), bound, inclusive, term);
+             }
+             else
+             {
+                 return super.newSliceRestriction(cfm, boundNames, bound, inclusive);
+             }
+         }
+ 
+         @Override
+         protected Restriction newEQRestriction(CFMetaData cfm,
+                                                VariableSpecifications boundNames) throws InvalidRequestException
+         {
 -            ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
++            ColumnDefinition columnDef = entity.prepare(cfm);
+             if (cfm.isSuperColumnKeyColumn(columnDef))
+             {
 -                Term term = toTerm(toReceivers(columnDef, cfm.isDense()), value, cfm.ksName, boundNames);
++                Term term = toTerm(toReceivers(columnDef), value, cfm.ksName, boundNames);
+                 return new SingleColumnRestriction.SuperColumnKeyEQRestriction(cfm.superColumnKeyColumn(), term);
+             }
+             else
+             {
+                 return super.newEQRestriction(cfm, boundNames);
+             }
+         }
+ 
+         @Override
+         protected Restriction newINRestriction(CFMetaData cfm,
+                                                VariableSpecifications boundNames) throws InvalidRequestException
+         {
 -            ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
++            ColumnDefinition columnDef = entity.prepare(cfm);
+             if (cfm.isSuperColumnKeyColumn(columnDef))
+             {
+                 List<? extends ColumnSpecification> receivers = Collections.singletonList(cfm.superColumnKeyColumn());
+                 List<Term> terms = toTerms(receivers, inValues, cfm.ksName, boundNames);
+                 if (terms == null)
+                 {
+                     Term term = toTerm(receivers, value, cfm.ksName, boundNames);
+                     return new SingleColumnRestriction.SuperColumnKeyINRestrictionWithMarkers(cfm.superColumnKeyColumn(), (Lists.Marker) term);
+                 }
+                 return new SingleColumnRestriction.SuperColumnKeyINRestrictionWithValues(cfm.superColumnKeyColumn(), terms);
+             }
+             else
+             {
+                 return super.newINRestriction(cfm, boundNames);
+             }
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java
index 0000000,d4c14df..1fe0af0
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java
+++ b/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java
@@@ -1,0 -1,765 +1,763 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.cql3;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.TreeSet;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.cql3.restrictions.Restriction;
+ import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
++import org.apache.cassandra.cql3.restrictions.SingleRestriction;
+ import org.apache.cassandra.cql3.restrictions.TermSlice;
+ import org.apache.cassandra.cql3.selection.Selection;
+ import org.apache.cassandra.cql3.statements.Bound;
+ import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.Columns;
+ import org.apache.cassandra.db.CompactTables;
+ import org.apache.cassandra.db.PartitionColumns;
+ import org.apache.cassandra.db.filter.ColumnFilter;
+ import org.apache.cassandra.db.filter.RowFilter;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.MapType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.rows.Cell;
+ import org.apache.cassandra.db.rows.CellPath;
+ import org.apache.cassandra.db.rows.ComplexColumnData;
+ import org.apache.cassandra.db.rows.Row;
+ import org.apache.cassandra.db.rows.RowIterator;
++import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+ import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+ import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+ import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+ import static org.apache.cassandra.cql3.statements.SelectStatement.getComponents;
+ 
+ /**
+  * Class incapsulating the helper logic to handle SELECT / UPDATE / INSERT special-cases related
+  * to SuperColumn tables in applicable scenarios.
+  *
+  * SuperColumn families have a special layout and are represented as a Map internally. These tables
+  * have two special columns (called `column2` and `value` by default):
+  *
+  *   * `column2`, {@link CFMetaData#superCfValueColumn}, a key of the SuperColumn map, exposed as a
+  *   REGULAR column, but stored in schema tables as a CLUSTERING column to make a distinction from
+  *   the SC value column in case of renames.
+  *   * `value`, {@link CFMetaData#compactValueColumn()}, a value of the SuperColumn map, exposed and
+  *   stored as a REGULAR column
+  *
+  * These columns have to be translated to this internal representation as key and value, correspondingly.
+  *
+  * In CQL terms, the SuperColumn families is encoded with:
+  *
+  *   CREATE TABLE super (
+  *      key [key_validation_class],
+  *      super_column_name [comparator],
+  *      [column_metadata_1] [type1],
+  *      ...,
+  *      [column_metadata_n] [type1],
+  *      "" map<[sub_comparator], [default_validation_class]>
+  *      PRIMARY KEY (key, super_column_name)
+  *   )
+  *
+  * In other words, every super column is encoded by a row. That row has one column for each defined
+  * "column_metadata", but it also has a special map column (whose name is the empty string as this is
+  * guaranteed to never conflict with a user-defined "column_metadata") which stores the super column
+  * "dynamic" sub-columns.
+  *
+  * On write path, `column2` and `value` columns are translated to the key and value of the
+  * underlying map. During the read, the inverse conversion is done. Deletes are converted into
+  * discards by the key in the underlying map. Counters are handled by translating an update to a
+  * counter update with a cell path. See {@link SuperColumnRestrictions} for the details.
+  *
+  * Since non-dense SuperColumn families do not modify the contents of the internal map through in CQL
+  * and do not expose this via CQL either, reads, writes and deletes are handled normally.
+  *
+  * Sidenote: a _dense_ SuperColumn Familiy is the one that has no added REGULAR columns.
+  */
+ public class SuperColumnCompatibility
+ {
+     // We use an empty value for the 1) this can't conflict with a user-defined column and 2) this actually
+     // validate with any comparator which makes it convenient for columnDefinitionComparator().
+     public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+     public static final String SUPER_COLUMN_MAP_COLUMN_STR = UTF8Type.instance.compose(SUPER_COLUMN_MAP_COLUMN);
+ 
+     /**
+      * Dense flag might have been incorrectly set if the node was upgraded from 2.x before CASSANDRA-12373.
+      *
+      * For 3.x created tables, the flag is set correctly in ThriftConversion code.
+      */
+     public static boolean recalculateIsDense(Columns columns)
+     {
+         return columns.size() == 1 && columns.getComplex(0).name.toString().isEmpty();
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families, the supercolumn key column has to be translated to the collection subselection
+      * query in order to avoid reading an entire collection and then filtering out the results.
+      */
+     public static ColumnFilter getColumnFilter(CFMetaData cfm, QueryOptions queryOptions, SuperColumnRestrictions restrictions)
+     {
+         assert cfm.isSuper() && cfm.isDense();
+ 
+         ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
+         builder.add(cfm.compactValueColumn());
+ 
+         if (restrictions.keySliceRestriction != null)
+         {
+             SingleColumnRestriction.SuperColumnKeySliceRestriction restriction = restrictions.keySliceRestriction;
+             TermSlice slice = restriction.slice;
+ 
+             ByteBuffer start = slice.hasBound(Bound.START) ? slice.bound(Bound.START).bindAndGet(queryOptions) : null;
+             ByteBuffer end = slice.hasBound(Bound.END) ? slice.bound(Bound.END).bindAndGet(queryOptions) : null;
+ 
+             builder.slice(cfm.compactValueColumn(),
+                           start == null ? CellPath.BOTTOM : CellPath.create(start),
+                           end == null ? CellPath.TOP : CellPath.create(end));
+         }
+         else if (restrictions.keyEQRestriction != null)
+         {
+             SingleColumnRestriction.SuperColumnKeyEQRestriction restriction = restrictions.keyEQRestriction;
+             ByteBuffer value = restriction.bindValue(queryOptions);
+             builder.select(cfm.compactValueColumn(), CellPath.create(value));
+         }
+         else if (restrictions.keyINRestriction != null)
+         {
+             SingleColumnRestriction.SuperColumnKeyINRestriction cast = restrictions.keyINRestriction;
+             Set<ByteBuffer> keyINRestrictionValues = new TreeSet<ByteBuffer>(((MapType) cfm.compactValueColumn().type).getKeysType());
+             keyINRestrictionValues.addAll(cast.getValues(queryOptions));
+ 
+             for (ByteBuffer value : keyINRestrictionValues)
+                 builder.select(cfm.compactValueColumn(), CellPath.create(value));
+         }
+         else if (restrictions.multiEQRestriction != null)
+         {
+             SingleColumnRestriction.SuperColumnMultiEQRestriction restriction = restrictions.multiEQRestriction;
+             ByteBuffer value = restriction.secondValue;
+             builder.select(cfm.compactValueColumn(), CellPath.create(value));
+         }
+ 
+         return builder.build();
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families.
+      *
+      * On read path, instead of writing row per map, we have to write a row per key/value pair in map.
+      *
+      * For example:
+      *
+      *   | partition-key | clustering-key | { key1: value1, key2: value2 } |
+      *
+      * Will be translated to:
+      *
+      *   | partition-key | clustering-key | key1 | value1 |
+      *   | partition-key | clustering-key | key2 | value2 |
+      *
+      */
 -    public static void processPartition(CFMetaData cfm, Selection selection, RowIterator partition, Selection.ResultSetBuilder result, int protocolVersion,
++    public static void processPartition(CFMetaData cfm, Selection selection, RowIterator partition, Selection.ResultSetBuilder result, ProtocolVersion protocolVersion,
+                                         SuperColumnRestrictions restrictions, QueryOptions queryOptions)
+     {
+         assert cfm.isDense();
+         ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey());
+ 
+         int nowInSeconds = FBUtilities.nowInSeconds();
+         while (partition.hasNext())
+         {
+             Row row = partition.next();
+ 
+             ComplexColumnData ccd = row.getComplexColumnData(cfm.compactValueColumn());
+ 
+             if (ccd == null)
+                 continue;
+ 
+             Iterator<Cell> cellIter = ccd.iterator();
+ 
+             outer:
+             while (cellIter.hasNext())
+             {
+                 Cell cell = cellIter.next();
+                 ByteBuffer superColumnKey = cell.path().get(0);
+ 
+                 if (restrictions != null)
+                 {
+                     // Slice on SuperColumn key
+                     if (restrictions.keySliceRestriction != null)
+                     {
+                         for (Bound bound : Bound.values())
+                         {
+                             if (restrictions.keySliceRestriction.hasBound(bound) &&
+                                 !restrictions.keySliceRestriction.isInclusive(bound))
+                             {
+                                 ByteBuffer excludedValue = restrictions.keySliceRestriction.bindValue(queryOptions);
+                                 if (excludedValue.equals(superColumnKey))
+                                     continue outer;
+                             }
+                         }
+                     }
+ 
+                     // Multi-column restriction on clustering+SuperColumn key
+                     if (restrictions.multiSliceRestriction != null &&
 -                        cfm.comparator.compare(row.clustering(), new Clustering(restrictions.multiSliceRestriction.firstValue)) == 0)
++                        cfm.comparator.compare(row.clustering(), Clustering.make(restrictions.multiSliceRestriction.firstValue)) == 0)
+                     {
+                         AbstractType t = ((MapType) cfm.compactValueColumn().type).getKeysType();
+                         int cmp = t.compare(superColumnKey, restrictions.multiSliceRestriction.secondValue);
+ 
+                         if ((cmp == 0 && !restrictions.multiSliceRestriction.trueInclusive) ||     // EQ
+                             (restrictions.multiSliceRestriction.hasBound(Bound.END) && cmp > 0) || // LT
+                             (restrictions.multiSliceRestriction.hasBound(Bound.START) && cmp < 0)) // GT
+                             continue outer;
+                     }
+                 }
+ 
 -                result.newRow(protocolVersion);
++                Row staticRow = partition.staticRow();
++                result.newRow(partition.partitionKey(), staticRow.clustering());
+ 
+                 for (ColumnDefinition def : selection.getColumns())
+                 {
+                     if (cfm.isSuperColumnKeyColumn(def))
+                     {
+                         result.add(superColumnKey);
+                     }
+                     else if (cfm.isSuperColumnValueColumn(def))
+                     {
+                         result.add(cell, nowInSeconds);
+                     }
+                     else
+                     {
+                         switch (def.kind)
+                         {
+                             case PARTITION_KEY:
+                                 result.add(keyComponents[def.position()]);
+                                 break;
+                             case CLUSTERING:
+                                 result.add(row.clustering().get(def.position()));
+                                 break;
+                             case REGULAR:
+                             case STATIC:
+                                 throw new AssertionError(String.format("Invalid column '%s' found in SuperColumn table", def.name.toString()));
+                         }
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families.
+      *
+      * On the write path, we have to do combine the columns into a key/value pair:
+      *
+      * So inserting a row:
+      *
+      *     | partition-key | clustering-key | key1 | value1 |
+      *
+      * Would result into:
+      *
+      *     | partition-key | clustering-key | {key1: value1} |
+      *
+      * or adding / overwriting the value for `key1`.
+      */
+     public static void prepareInsertOperations(CFMetaData cfm,
 -                                               List<ColumnIdentifier.Raw> columnNames,
++                                               List<ColumnDefinition.Raw> columnNames,
+                                                WhereClause.Builder whereClause,
+                                                List<Term.Raw> columnValues,
+                                                VariableSpecifications boundNames,
+                                                Operations operations)
+     {
+         List<ColumnDefinition> defs = new ArrayList<>(columnNames.size());
+         for (int i = 0; i < columnNames.size(); i++)
+         {
 -            ColumnIdentifier id = columnNames.get(i).prepare(cfm);
 -            defs.add(cfm.getColumnDefinition(id));
++            ColumnDefinition id = columnNames.get(i).prepare(cfm);
++            defs.add(id);
+         }
+ 
+         prepareInsertOperations(cfm, defs, boundNames, columnValues, whereClause, operations);
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families.
+      *
+      * {@link #prepareInsertOperations(CFMetaData, List, VariableSpecifications, List, WhereClause.Builder, Operations)},
+      * but for INSERT JSON queries
+      */
+     public static void prepareInsertJSONOperations(CFMetaData cfm,
+                                                    List<ColumnDefinition> defs,
+                                                    VariableSpecifications boundNames,
+                                                    Json.Prepared prepared,
+                                                    WhereClause.Builder whereClause,
+                                                    Operations operations)
+     {
+         List<Term.Raw> columnValues = new ArrayList<>(defs.size());
+         for (ColumnDefinition def : defs)
 -            columnValues.add(prepared.getRawTermForColumn(def));
++            columnValues.add(prepared.getRawTermForColumn(def, true));
+ 
+         prepareInsertOperations(cfm, defs, boundNames, columnValues, whereClause, operations);
+     }
+ 
+     private static void prepareInsertOperations(CFMetaData cfm,
+                                                 List<ColumnDefinition> defs,
+                                                 VariableSpecifications boundNames,
+                                                 List<Term.Raw> columnValues,
+                                                 WhereClause.Builder whereClause,
+                                                 Operations operations)
+     {
+         assert cfm.isDense();
+         assert defs.size() == columnValues.size();
+ 
+         Term.Raw superColumnKey = null;
+         Term.Raw superColumnValue = null;
+ 
+         for (int i = 0, size = defs.size(); i < size; i++)
+         {
+             ColumnDefinition def = defs.get(i);
+             Term.Raw raw = columnValues.get(i);
+ 
+             if (cfm.isSuperColumnKeyColumn(def))
+             {
+                 superColumnKey = raw;
+                 collectMarkerSpecifications(raw, boundNames, def);
+             }
+             else if (cfm.isSuperColumnValueColumn(def))
+             {
+                 superColumnValue = raw;
+                 collectMarkerSpecifications(raw, boundNames, def);
+             }
+             else if (def.isPrimaryKeyColumn())
+             {
 -                whereClause.add(new SingleColumnRelation(new ColumnIdentifier.ColumnIdentifierValue(def.name), Operator.EQ, raw));
++                whereClause.add(new SingleColumnRelation(ColumnDefinition.Raw.forColumn(def), Operator.EQ, raw));
+             }
+             else
+             {
+                 throw invalidRequest("Invalid column {} in where clause");
+             }
+         }
+ 
+         checkTrue(superColumnValue != null,
+                   "Column value is mandatory for SuperColumn tables");
+         checkTrue(superColumnKey != null,
+                   "Column key is mandatory for SuperColumn tables");
+ 
 -        Operation operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn());
++        Operation operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm, cfm.compactValueColumn());
+         operations.add(operation);
+     }
+ 
+     /**
+      * Collect the marker specifications for the bound columns manually, since the operations on a column are
+      * converted to the operations on the collection element.
+      */
+     private static void collectMarkerSpecifications(Term.Raw raw, VariableSpecifications boundNames, ColumnDefinition def)
+     {
+         if (raw instanceof AbstractMarker.Raw)
+             boundNames.add(((AbstractMarker.Raw) raw).bindIndex(), def);
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families.
+      *
+      * During UPDATE operation, the update by clustering (with correponding relation in WHERE clause)
+      * has to be substituted with an update to the map that backs the given SuperColumn.
+      *
+      * For example, an update such as:
+      *
+      *     UPDATE ... SET value = 'value1' WHERE key = 'pk' AND column1 = 'ck' AND column2 = 'mk'
+      *
+      * Will update the value under key 'mk' in the map, backing the SuperColumn, located in the row
+      * with clustering 'ck' in the partition with key 'pk'.
+      */
+     public static WhereClause prepareUpdateOperations(CFMetaData cfm,
+                                                       WhereClause whereClause,
 -                                                      List<Pair<ColumnIdentifier.Raw, Operation.RawUpdate>> updates,
++                                                      List<Pair<ColumnDefinition.Raw, Operation.RawUpdate>> updates,
+                                                       VariableSpecifications boundNames,
+                                                       Operations operations)
+     {
+         assert cfm.isDense();
+         Term.Raw superColumnKey = null;
+         Term.Raw superColumnValue = null;
+ 
+         List<Relation> newRelations = new ArrayList<>(whereClause.relations.size());
+         for (int i = 0; i < whereClause.relations.size(); i++)
+         {
+             SingleColumnRelation relation = (SingleColumnRelation) whereClause.relations.get(i);
 -            ColumnIdentifier id = relation.getEntity().prepare(cfm);
 -            ColumnDefinition def = cfm.getColumnDefinition(id);
++            ColumnDefinition def = relation.getEntity().prepare(cfm);
+ 
+             if (cfm.isSuperColumnKeyColumn(def))
+             {
+                 superColumnKey = relation.getValue();
+                 collectMarkerSpecifications(superColumnKey, boundNames, def);
+             }
+             else
+             {
+                 newRelations.add(relation);
+             }
+         }
+ 
+         checkTrue(superColumnKey != null,
+                   "Column key is mandatory for SuperColumn tables");
+ 
 -        for (Pair<ColumnIdentifier.Raw, Operation.RawUpdate> entry : updates)
++        for (Pair<ColumnDefinition.Raw, Operation.RawUpdate> entry : updates)
+         {
 -            ColumnIdentifier id = entry.left.prepare(cfm);
 -            ColumnDefinition def = cfm.getColumnDefinition(id);
++            ColumnDefinition def = entry.left.prepare(cfm);
+ 
+             if (!cfm.isSuperColumnValueColumn(def))
+                 throw invalidRequest("Column `%s` of type `%s` found in SET part", def.name, def.type.asCQL3Type());
+ 
+             Operation operation;
+ 
+             if (entry.right instanceof Operation.Addition)
+             {
+                 Operation.Addition op = (Operation.Addition) entry.right;
+                 superColumnValue = op.value();
+ 
 -                operation = new Operation.ElementAddition(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn());
++                operation = new Operation.ElementAddition(superColumnKey, superColumnValue).prepare(cfm, cfm.compactValueColumn());
+             }
+             else if (entry.right instanceof Operation.Substraction)
+             {
+                 Operation.Substraction op = (Operation.Substraction) entry.right;
+                 superColumnValue = op.value();
+ 
 -                operation = new Operation.ElementSubtraction(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn());
++                operation = new Operation.ElementSubtraction(superColumnKey, superColumnValue).prepare(cfm, cfm.compactValueColumn());
+             }
+             else if (entry.right instanceof Operation.SetValue)
+             {
+                 Operation.SetValue op = (Operation.SetValue) entry.right;
+                 superColumnValue = op.value();
+ 
 -                operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn());
++                operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm, cfm.compactValueColumn());
+             }
+             else
+             {
+                 throw invalidRequest("Invalid operation `%s` on column `%s` of type `%s` found in SET part", entry.right, def.name, def.type.asCQL3Type());
+             }
+ 
+             collectMarkerSpecifications(superColumnValue, boundNames, def);
+             operations.add(operation);
+         }
+ 
+         checkTrue(superColumnValue != null,
+                   "Column value is mandatory for SuperColumn tables");
+ 
+         return newRelations.size() != whereClause.relations.size() ? whereClause.copy(newRelations) : whereClause;
+     }
+ 
+     /**
+      * Rebuilds LWT conditions on SuperColumn _value_ column.
+      *
+      * Conditions have to be changed to correspond the internal representation of SuperColumn value, since it's not
+      * a separate column, but a value in a hidden compact value column.
+      */
+     public static Conditions rebuildLWTColumnConditions(Conditions conditions, CFMetaData cfm, WhereClause whereClause)
+     {
+         if (conditions.isEmpty() || conditions.isIfExists() || conditions.isIfNotExists())
+             return conditions;
+ 
+         ColumnConditions.Builder builder = ColumnConditions.newBuilder();
+         Collection<ColumnCondition> columnConditions = ((ColumnConditions) conditions).columnConditions();
+ 
+         Pair<ColumnDefinition, Relation> superColumnKeyRelation = SuperColumnCompatibility.getSuperColumnKeyRelation(whereClause.relations, cfm);
+ 
+         checkNotNull(superColumnKeyRelation,
+                      "Lightweight transactions on SuperColumn tables are only supported with supplied SuperColumn key");
+ 
+         for (ColumnCondition columnCondition : columnConditions)
+         {
+             checkTrue(cfm.isSuperColumnValueColumn(columnCondition.column),
+                       "Lightweight transactions are only supported on the value column of SuperColumn tables");
+ 
+             Term.Raw value = superColumnKeyRelation.right.getValue();
+             Term collectionElemnt = value instanceof AbstractMarker.Raw ?
+                                     new Constants.Marker(((AbstractMarker.Raw) value).bindIndex(),
+                                                          superColumnKeyRelation.left) :
+                                     value.prepare(cfm.ksName, superColumnKeyRelation.left);
+             builder.add(ColumnCondition.condition(cfm.compactValueColumn(),
+                                                   collectionElemnt,
+                                                   columnCondition.value(), columnCondition.operator));
+         }
+ 
+         return builder.build();
+     }
+ 
+     /**
+      * Returns a relation on the SuperColumn key
+      */
+     private static Pair<ColumnDefinition, Relation> getSuperColumnKeyRelation(List<Relation> relations, CFMetaData cfm)
+     {
+         for (int i = 0; i < relations.size(); i++)
+         {
+             SingleColumnRelation relation = (SingleColumnRelation) relations.get(i);
 -            ColumnIdentifier id = relation.getEntity().prepare(cfm);
 -            ColumnDefinition def = cfm.getColumnDefinition(id);
++            ColumnDefinition def = relation.getEntity().prepare(cfm);
+ 
+             if (cfm.isSuperColumnKeyColumn(def))
+                 return Pair.create(def, relation);
+         }
+         return null;
+     }
+ 
+     /**
+      * For _dense_ SuperColumn Families.
+      *
+      * Delete, when the "regular" columns are present, have to be translated into
+      * deletion of value in the internal map by key.
+      *
+      * For example, delete such as:
+      *
+      *     DELETE FROM ... WHERE key = 'pk' AND column1 = 'ck' AND column2 = 'mk'
+      *
+      * Will delete a value under 'mk' from the map, located in the row with clustering key 'ck' in the partition
+      * with key 'pk'.
+      */
+     public static WhereClause prepareDeleteOperations(CFMetaData cfm,
+                                                       WhereClause whereClause,
+                                                       VariableSpecifications boundNames,
+                                                       Operations operations)
+     {
+         assert cfm.isDense();
+         List<Relation> newRelations = new ArrayList<>(whereClause.relations.size());
+ 
+         for (int i = 0; i < whereClause.relations.size(); i++)
+         {
+             Relation orig = whereClause.relations.get(i);
+ 
+             checkFalse(orig.isMultiColumn(),
+                        "Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", orig);
+             checkFalse(orig.onToken(),
+                        "Token relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", orig);
+ 
+             SingleColumnRelation relation = (SingleColumnRelation) orig;
 -            ColumnIdentifier id = relation.getEntity().prepare(cfm);
 -            ColumnDefinition def = cfm.getColumnDefinition(id);
++            ColumnDefinition def = relation.getEntity().prepare(cfm);
+ 
+             if (cfm.isSuperColumnKeyColumn(def))
+             {
+                 Term.Raw value = relation.getValue();
+ 
+                 if (value instanceof AbstractMarker.Raw)
+                     boundNames.add(((AbstractMarker.Raw) value).bindIndex(), def);
+ 
+                 Operation operation = new Maps.DiscarderByKey(cfm.compactValueColumn(), value.prepare(cfm.ksName, def));
+                 operations.add(operation);
+             }
+             else
+             {
+                 newRelations.add(relation);
+             }
+         }
+ 
+         return newRelations.size() != whereClause.relations.size() ? whereClause.copy(newRelations) : whereClause;
+     }
+ 
+     /**
+      * Create a column name generator for SuperColumns
+      */
+     public static CompactTables.DefaultNames columnNameGenerator(List<ColumnDefinition> partitionKeyColumns,
+                                                                  List<ColumnDefinition> clusteringColumns,
+                                                                  PartitionColumns partitionColumns)
+     {
+         Set<String> names = new HashSet<>();
+         // If the clustering column was renamed, the supercolumn key's default nname still can't be `column1` (SuperColumn
+         // key renames are handled separately by looking up an existing column).
+         names.add("column1");
+         for (ColumnDefinition columnDefinition: partitionKeyColumns)
+             names.add(columnDefinition.name.toString());
+         for (ColumnDefinition columnDefinition: clusteringColumns)
+             names.add(columnDefinition.name.toString());
+         for (ColumnDefinition columnDefinition: partitionColumns)
+             names.add(columnDefinition.name.toString());
+ 
+         return CompactTables.defaultNameGenerator(names);
+     }
+ 
+     /**
+      * Find a SuperColumn key column if it's available (for example, when it was renamed) or create one with a default name.
+      */
+     public static ColumnDefinition getSuperCfKeyColumn(CFMetaData cfm, List<ColumnDefinition> clusteringColumns, CompactTables.DefaultNames defaultNames)
+     {
+         assert cfm.isDense();
+ 
+         MapType mapType = (MapType) cfm.compactValueColumn().type;
+         // Pre CASSANDRA-12373 3.x-created supercolumn family
+         if (clusteringColumns.size() == 1)
+         {
+             // create a new one with a default name
+             ColumnIdentifier identifier = ColumnIdentifier.getInterned(defaultNames.defaultClusteringName(), true);
+             return new ColumnDefinition(cfm.ksName, cfm.cfName, identifier, mapType.getKeysType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR);
+         }
+ 
+         // Upgrade path: table created in 2.x, handle pre-created columns and/or renames.
+         assert clusteringColumns.size() == 2 : clusteringColumns;
+         ColumnDefinition cd = clusteringColumns.get(1);
+ 
+         assert cd.type.equals(mapType.getKeysType()) : cd.type + " != " + mapType.getKeysType();
+         return new ColumnDefinition(cfm.ksName, cfm.cfName, cd.name, mapType.getKeysType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR);
+     }
+ 
+     /**
+      * Find a SuperColumn value column if it's available (for example, when it was renamed) or create one with a default name.
+      */
+     public static ColumnDefinition getSuperCfValueColumn(CFMetaData cfm, PartitionColumns partitionColumns, ColumnDefinition superCfKeyColumn, CompactTables.DefaultNames defaultNames)
+     {
+         assert cfm.isDense();
+ 
+         MapType mapType = (MapType) cfm.compactValueColumn().type;
+         for (ColumnDefinition def: partitionColumns.regulars)
+         {
+             if (!def.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN) && def.type.equals(mapType.getValuesType()) && !def.equals(superCfKeyColumn))
+                 return def;
+         }
+ 
+         ColumnIdentifier identifier = ColumnIdentifier.getInterned(defaultNames.defaultCompactValueName(), true);
+         return new ColumnDefinition(cfm.ksName, cfm.cfName, identifier, mapType.getValuesType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR);
+     }
+ 
+     /**
+      * SuperColumn key is stored in {@link CFMetaData#columnMetadata} as a clustering column (to make sure we can make
+      * a distinction between the SuperColumn key and SuperColumn value columns, especially when they have the same type
+      * and were renamed), but exposed as {@link CFMetaData#superCfKeyColumn} as a regular column to be compatible with
+      * the storage engine.
+      *
+      * This remapping is necessary to facilitate the column metadata part.
+      */
+     public static ColumnDefinition getSuperCfSschemaRepresentation(ColumnDefinition superCfKeyColumn)
+     {
+         return new ColumnDefinition(superCfKeyColumn.ksName, superCfKeyColumn.cfName, superCfKeyColumn.name, superCfKeyColumn.type, 1, ColumnDefinition.Kind.CLUSTERING);
+     }
+ 
+     public static boolean isSuperColumnMapColumn(ColumnDefinition column)
+     {
+         return column.isRegular() && column.name.bytes.equals(SuperColumnCompatibility.SUPER_COLUMN_MAP_COLUMN);
+     }
+ 
+     public static ColumnDefinition getCompactValueColumn(PartitionColumns columns)
+     {
+         for (ColumnDefinition column : columns.regulars)
+         {
+             if (isSuperColumnMapColumn(column))
+                 return column;
+         }
+         throw new AssertionError("Invalid super column table definition, no 'dynamic' map column");
+     }
+ 
+     /**
+      * Restrictions are the trickiest part of the SuperColumn integration.
+      * See specific docs on each field. For the purpose of this doc, the "default" column names are used,
+      * `column2` and `value`. Detailed description and semantics of these fields can be found in this class'
+      * header comment.
+      */
+     public static class SuperColumnRestrictions
+     {
+         /**
+          * Restrictions in the form of:
+          *   ... AND (column1, column2) > ('value1', 1)
+          * Multi-column restrictions. `column1` will be handled normally by the clustering bounds,
+          * and `column2` value has to be "saved" and filtered out in `processPartition`, as there's no
+          * direct mapping of multi-column restrictions to clustering + cell path. The first row
+          * is special-cased to make sure the semantics of multi-column restrictions are preserved.
+          */
+         private final SingleColumnRestriction.SuperColumnMultiSliceRestriction multiSliceRestriction;
+ 
+         /**
+          * Restrictions in the form of:
+          *   ... AND (column1, column2) = ('value1', 1)
+          * Multi-column restriction with EQ does have a direct mapping: `column1` will be handled
+          * normally by the clustering bounds, and the `column2` will be special-cased by the
+          * {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} as a collection path lookup.
+          */
+         private final SingleColumnRestriction.SuperColumnMultiEQRestriction multiEQRestriction;
+ 
+         /**
+          * Restrictions in the form of:
+          *   ... AND column2 >= 5
+          * For non-filtering cases (when the preceding clustering column and a partition key are
+          * restricted), will be handled in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)}
+          * like an inclusive bounds lookup.
+          *
+          * For the restrictions taking a form of
+          *   ... AND column2 > 5
+          * (non-inclusive ones), the items that match `=` will be filtered out
 -         * by {@link #processPartition(CFMetaData, Selection, RowIterator, Selection.ResultSetBuilder, int, SuperColumnRestrictions, QueryOptions)}
++         * by {@link #processPartition(CFMetaData, Selection, RowIterator, Selection.ResultSetBuilder, ProtocolVersion, SuperColumnRestrictions, QueryOptions)}
+          *
+          * Unfortunately, there are no good ways to do it other than here:
+          * {@link RowFilter} can't be used in this case, since the complex collection cells are not yet rows by that
+          * point.
+          * {@link ColumnFilter} (which is used for inclusive slices) can't be changed to support exclusive slices as it would
+          * require a protocol change in order to add a Kind. So exclusive slices are a combination of inclusive plus
+          * an ad-hoc filter.
+          */
+         private final SingleColumnRestriction.SuperColumnKeySliceRestriction keySliceRestriction;
+ 
+         /**
+          * Restrictions in the form of:
+          *   ... AND column2 IN (1, 2, 3)
+          * For non-filtering cases (when the preceeding clustering column and a partition key are
+          * restricted), are handled in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} by
+          * adding multiple collection paths to the {@link ColumnFilter}
+          */
+         private final SingleColumnRestriction.SuperColumnKeyINRestriction keyINRestriction;
+ 
+         /**
+          * Restrictions in the form of:
+          *   ... AND column2 = 1
+          * For non-filtering cases (when the preceeding clustering column and a partition key are
+          * restricted), will be handled by converting the restriction to the column filter on
+          * the collection key in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)}
+          */
+         private final SingleColumnRestriction.SuperColumnKeyEQRestriction keyEQRestriction;
+ 
 -        public SuperColumnRestrictions(Iterator<Restriction> restrictions)
++        public SuperColumnRestrictions(Iterator<SingleRestriction> restrictions)
+         {
+             // In order to keep the fields final, assignments have to be done outside the loop
+             SingleColumnRestriction.SuperColumnMultiSliceRestriction multiSliceRestriction = null;
+             SingleColumnRestriction.SuperColumnKeySliceRestriction keySliceRestriction = null;
+             SingleColumnRestriction.SuperColumnKeyINRestriction keyINRestriction = null;
+             SingleColumnRestriction.SuperColumnMultiEQRestriction multiEQRestriction = null;
+             SingleColumnRestriction.SuperColumnKeyEQRestriction keyEQRestriction = null;
+ 
+             while (restrictions.hasNext())
+             {
 -                Restriction restriction = restrictions.next();
++                SingleRestriction restriction = restrictions.next();
+ 
+                 if (restriction instanceof SingleColumnRestriction.SuperColumnMultiSliceRestriction)
+                     multiSliceRestriction = (SingleColumnRestriction.SuperColumnMultiSliceRestriction) restriction;
+                 else if (restriction instanceof SingleColumnRestriction.SuperColumnKeySliceRestriction)
+                     keySliceRestriction = (SingleColumnRestriction.SuperColumnKeySliceRestriction) restriction;
+                 else if (restriction instanceof SingleColumnRestriction.SuperColumnKeyINRestriction)
+                     keyINRestriction = (SingleColumnRestriction.SuperColumnKeyINRestriction) restriction;
+                 else if (restriction instanceof SingleColumnRestriction.SuperColumnMultiEQRestriction)
+                     multiEQRestriction = (SingleColumnRestriction.SuperColumnMultiEQRestriction) restriction;
+                 else if (restriction instanceof SingleColumnRestriction.SuperColumnKeyEQRestriction)
+                     keyEQRestriction = (SingleColumnRestriction.SuperColumnKeyEQRestriction) restriction;
+             }
+ 
+             this.multiSliceRestriction = multiSliceRestriction;
+             this.keySliceRestriction = keySliceRestriction;
+             this.keyINRestriction = keyINRestriction;
+             this.multiEQRestriction = multiEQRestriction;
+             this.keyEQRestriction = keyEQRestriction;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 49897ac,7d09506..ab61a0d
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@@ -170,7 -175,7 +175,7 @@@ public class UpdateParameter
          //
          // We set counterid to a special value to differentiate between regular pre-2.0 local shards from pre-2.1 era
          // and "counter update" temporary state cells. Please see CounterContext.createUpdate() for further details.
-         builder.addCell(BufferCell.live(column, timestamp, CounterContext.instance().createUpdate(increment)));
 -        builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createUpdate(increment), path));
++        builder.addCell(BufferCell.live(column, timestamp, CounterContext.instance().createUpdate(increment), path));
      }
  
      public void setComplexDeletionTime(ColumnDefinition column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index ed0d325,0000000..a8cc6bd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@@ -1,226 -1,0 +1,230 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.util.*;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.utils.btree.BTreeSet;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * A set of restrictions on the clustering key.
 + */
 +final class ClusteringColumnRestrictions extends RestrictionSetWrapper
 +{
 +    /**
 +     * The composite type.
 +     */
 +    protected final ClusteringComparator comparator;
 +
 +    /**
 +     * <code>true</code> if filtering is allowed for this restriction, <code>false</code> otherwise
 +     */
 +    private final boolean allowFiltering;
 +
 +    public ClusteringColumnRestrictions(CFMetaData cfm)
 +    {
 +        this(cfm, false);
 +    }
 +
 +    public ClusteringColumnRestrictions(CFMetaData cfm, boolean allowFiltering)
 +    {
 +        this(cfm.comparator, new RestrictionSet(), allowFiltering);
 +    }
 +
 +    private ClusteringColumnRestrictions(ClusteringComparator comparator,
 +                                         RestrictionSet restrictionSet,
 +                                         boolean allowFiltering)
 +    {
 +        super(restrictionSet);
 +        this.comparator = comparator;
 +        this.allowFiltering = allowFiltering;
 +    }
 +
 +    public ClusteringColumnRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        SingleRestriction newRestriction = (SingleRestriction) restriction;
 +        RestrictionSet newRestrictionSet = restrictions.addRestriction(newRestriction);
 +
 +        if (!isEmpty() && !allowFiltering)
 +        {
 +            SingleRestriction lastRestriction = restrictions.lastRestriction();
 +            ColumnDefinition lastRestrictionStart = lastRestriction.getFirstColumn();
 +            ColumnDefinition newRestrictionStart = restriction.getFirstColumn();
 +
 +            checkFalse(lastRestriction.isSlice() && newRestrictionStart.position() > lastRestrictionStart.position(),
 +                       "Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                       newRestrictionStart.name,
 +                       lastRestrictionStart.name);
 +
 +            if (newRestrictionStart.position() < lastRestrictionStart.position() && newRestriction.isSlice())
 +                throw invalidRequest("PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                                     restrictions.nextColumn(newRestrictionStart).name,
 +                                     newRestrictionStart.name);
 +        }
 +
 +        return new ClusteringColumnRestrictions(this.comparator, newRestrictionSet, allowFiltering);
 +    }
 +
 +    private boolean hasMultiColumnSlice()
 +    {
 +        for (SingleRestriction restriction : restrictions)
 +        {
 +            if (restriction.isMultiColumn() && restriction.isSlice())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    public NavigableSet<Clustering> valuesAsClustering(QueryOptions options) throws InvalidRequestException
 +    {
 +        MultiCBuilder builder = MultiCBuilder.create(comparator, hasIN());
 +        for (SingleRestriction r : restrictions)
 +        {
 +            r.appendTo(builder, options);
 +            if (builder.hasMissingElements())
 +                break;
 +        }
 +        return builder.build();
 +    }
 +
 +    public NavigableSet<ClusteringBound> boundsAsClustering(Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        MultiCBuilder builder = MultiCBuilder.create(comparator, hasIN() || hasMultiColumnSlice());
 +        int keyPosition = 0;
 +
 +        for (SingleRestriction r : restrictions)
 +        {
 +            if (handleInFilter(r, keyPosition))
 +                break;
 +
 +            if (r.isSlice())
 +            {
 +                r.appendBoundTo(builder, bound, options);
 +                return builder.buildBoundForSlice(bound.isStart(),
 +                                                  r.isInclusive(bound),
 +                                                  r.isInclusive(bound.reverse()),
 +                                                  r.getColumnDefs());
 +            }
 +
 +            r.appendBoundTo(builder, bound, options);
 +
 +            if (builder.hasMissingElements())
 +                return BTreeSet.empty(comparator);
 +
 +            keyPosition = r.getLastColumn().position() + 1;
 +        }
 +
 +        // Everything was an equal (or there was nothing)
 +        return builder.buildBound(bound.isStart(), true);
 +    }
 +
 +    /**
 +     * Checks if any of the underlying restriction is a CONTAINS or CONTAINS KEY.
 +     *
 +     * @return <code>true</code> if any of the underlying restriction is a CONTAINS or CONTAINS KEY,
 +     * <code>false</code> otherwise
 +     */
 +    public final boolean hasContains()
 +    {
 +        for (SingleRestriction restriction : restrictions)
 +        {
 +            if (restriction.isContains())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Checks if any of the underlying restriction is a slice restrictions.
 +     *
 +     * @return <code>true</code> if any of the underlying restriction is a slice restrictions,
 +     * <code>false</code> otherwise
 +     */
 +    public final boolean hasSlice()
 +    {
 +        for (SingleRestriction restriction : restrictions)
 +        {
 +            if (restriction.isSlice())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Checks if underlying restrictions would require filtering
 +     *
 +     * @return <code>true</code> if any underlying restrictions require filtering, <code>false</code>
 +     * otherwise
 +     */
 +    public final boolean needFiltering()
 +    {
 +        int position = 0;
 +
 +        for (SingleRestriction restriction : restrictions)
 +        {
 +            if (handleInFilter(restriction, position))
 +                return true;
 +
 +            if (!restriction.isSlice())
 +                position = restriction.getLastColumn().position() + 1;
 +        }
 +        return hasContains();
 +    }
 +
 +    @Override
 +    public void addRowFilterTo(RowFilter filter,
 +                               SecondaryIndexManager indexManager,
 +                               QueryOptions options) throws InvalidRequestException
 +    {
 +        int position = 0;
 +
 +        for (SingleRestriction restriction : restrictions)
 +        {
 +            // We ignore all the clustering columns that can be handled by slices.
 +            if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexManager))
 +            {
 +                restriction.addRowFilterTo(filter, indexManager, options);
 +                continue;
 +            }
 +
 +            if (!restriction.isSlice())
 +                position = restriction.getLastColumn().position() + 1;
 +        }
 +    }
 +
 +    private boolean handleInFilter(SingleRestriction restriction, int index)
 +    {
 +        return restriction.isContains() || restriction.isLIKE() || index != restriction.getFirstColumn().position();
 +    }
 +
++    public Iterator<SingleRestriction> iterator()
++    {
++        return restrictions.iterator();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4734ce7d/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 1d84331,5985962..09c02ed
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@@ -29,10 -27,9 +29,11 @@@ import org.apache.cassandra.cql3.functi
  import org.apache.cassandra.cql3.statements.Bound;
  import org.apache.cassandra.db.MultiCBuilder;
  import org.apache.cassandra.db.filter.RowFilter;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.index.Index;
  import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
  
  import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
  import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@@ -646,134 -647,201 +647,332 @@@ public abstract class SingleColumnRestr
          }
      }
  
 +    public static final class LikeRestriction extends SingleColumnRestriction
 +    {
 +        private static final ByteBuffer LIKE_WILDCARD = ByteBufferUtil.bytes("%");
 +        private final Operator operator;
 +        private final Term value;
 +
 +        public LikeRestriction(ColumnDefinition columnDef, Operator operator, Term value)
 +        {
 +            super(columnDef);
 +            this.operator = operator;
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public void addFunctionsTo(List<Function> functions)
 +        {
 +            value.addFunctionsTo(functions);
 +        }
 +
 +        @Override
 +        public boolean isEQ()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public boolean isLIKE()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public boolean canBeConvertedToMultiColumnRestriction()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public void addRowFilterTo(RowFilter filter,
 +                                   SecondaryIndexManager indexManager,
 +                                   QueryOptions options)
 +        {
 +            Pair<Operator, ByteBuffer> operation = makeSpecific(value.bindAndGet(options));
 +
 +            // there must be a suitable INDEX for LIKE_XXX expressions
 +            RowFilter.SimpleExpression expression = filter.add(columnDef, operation.left, operation.right);
 +            indexManager.getBestIndexFor(expression)
 +                        .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns",
 +                                                          expression));
 +        }
 +
 +        @Override
 +        public MultiCBuilder appendTo(MultiCBuilder builder, QueryOptions options)
 +        {
 +            // LIKE can be used with clustering columns, but as it doesn't
 +            // represent an actual clustering value, it can't be used in a
 +            // clustering filter.
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return operator.toString();
 +        }
 +
 +        @Override
 +        public SingleRestriction doMergeWith(SingleRestriction otherRestriction)
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a %s", columnDef.name, operator);
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(Index index)
 +        {
 +            return index.supportsExpression(columnDef, operator);
 +        }
 +
 +        /**
 +         * As the specific subtype of LIKE (LIKE_PREFIX, LIKE_SUFFIX, LIKE_CONTAINS, LIKE_MATCHES) can only be
 +         * determined by examining the value, which in turn can only be known after binding, all LIKE restrictions
 +         * are initially created with the generic LIKE operator. This function takes the bound value, trims the
 +         * wildcard '%' chars from it and returns a tuple of the inferred operator subtype and the final value
 +         * @param value the bound value for the LIKE operation
 +         * @return  Pair containing the inferred LIKE subtype and the value with wildcards removed
 +         */
 +        private static Pair<Operator, ByteBuffer> makeSpecific(ByteBuffer value)
 +        {
 +            Operator operator;
 +            int beginIndex = value.position();
 +            int endIndex = value.limit() - 1;
 +            if (ByteBufferUtil.endsWith(value, LIKE_WILDCARD))
 +            {
 +                if (ByteBufferUtil.startsWith(value, LIKE_WILDCARD))
 +                {
 +                    operator = Operator.LIKE_CONTAINS;
 +                    beginIndex =+ 1;
 +                }
 +                else
 +                {
 +                    operator = Operator.LIKE_PREFIX;
 +                }
 +            }
 +            else if (ByteBufferUtil.startsWith(value, LIKE_WILDCARD))
 +            {
 +                operator = Operator.LIKE_SUFFIX;
 +                beginIndex += 1;
 +                endIndex += 1;
 +            }
 +            else
 +            {
 +                operator = Operator.LIKE_MATCHES;
 +                endIndex += 1;
 +            }
 +
 +            if (endIndex == 0 || beginIndex == endIndex)
 +                throw invalidRequest("LIKE value can't be empty.");
 +
 +            ByteBuffer newValue = value.duplicate();
 +            newValue.position(beginIndex);
 +            newValue.limit(endIndex);
 +            return Pair.create(operator, newValue);
 +        }
 +    }
++
+     /**
+      * Super Column Compatibiltiy
+      */
+ 
+     public static class SuperColumnMultiEQRestriction extends EQRestriction
+     {
+         public ByteBuffer firstValue;
+         public ByteBuffer secondValue;
+ 
+         public SuperColumnMultiEQRestriction(ColumnDefinition columnDef, Term value)
+         {
+             super(columnDef, value);
+         }
+ 
+         @Override
+         public MultiCBuilder appendTo(MultiCBuilder builder, QueryOptions options)
+         {
+             Term term = value.bind(options);
+ 
+             assert (term instanceof Tuples.Value);
+             firstValue = ((Tuples.Value)term).getElements().get(0);
+             secondValue = ((Tuples.Value)term).getElements().get(1);
+ 
+             builder.addElementToAll(firstValue);
+             checkFalse(builder.containsNull(), "Invalid null value in condition for column %s", columnDef.name);
+             checkFalse(builder.containsUnset(), "Invalid unset value for column %s", columnDef.name);
+             return builder;
+         }
+     }
+ 
+     public static class SuperColumnMultiSliceRestriction extends SliceRestriction
+     {
+         public ByteBuffer firstValue;
+         public ByteBuffer secondValue;
+ 
+         // These are here to avoid polluting SliceRestriction
+         public final Bound bound;
+         public final boolean trueInclusive;
+         public SuperColumnMultiSliceRestriction(ColumnDefinition columnDef, Bound bound, boolean inclusive, Term term)
+         {
+             super(columnDef, bound, true, term);
+             this.bound = bound;
+             this.trueInclusive = inclusive;
+ 
+         }
+ 
+         @Override
+         public MultiCBuilder appendBoundTo(MultiCBuilder builder, Bound bound, QueryOptions options)
+         {
 -            Bound b = reverseBoundIfNeeded(getFirstColumn(), bound);
++            Bound b = bound.reverseIfNeeded(getFirstColumn());
+ 
+             if (!hasBound(b))
+                 return builder;
+ 
+             Term term = slice.bound(b);
+ 
+             assert (term instanceof Tuples.Value);
+             firstValue = ((Tuples.Value)term).getElements().get(0);
+             secondValue = ((Tuples.Value)term).getElements().get(1);
+ 
+             checkBindValueSet(firstValue, "Invalid unset value for column %s", columnDef.name);
+             checkBindValueSet(secondValue, "Invalid unset value for column %s", columnDef.name);
+             return builder.addElementToAll(firstValue);
+ 
+         }
+     }
+ 
+     public static final class SuperColumnKeyEQRestriction extends EQRestriction
+     {
+         public SuperColumnKeyEQRestriction(ColumnDefinition columnDef, Term value)
+         {
+             super(columnDef, value);
+         }
+ 
+         public ByteBuffer bindValue(QueryOptions options)
+         {
+             return value.bindAndGet(options);
+         }
+ 
+         @Override
+         public MultiCBuilder appendBoundTo(MultiCBuilder builder, Bound bound, QueryOptions options)
+         {
+             // no-op
+             return builder;
+         }
+ 
+         @Override
+         public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) throws InvalidRequestException
+         {
+             // no-op
+         }
+     }
+ 
+     public static abstract class SuperColumnKeyINRestriction extends INRestriction
+     {
+         public SuperColumnKeyINRestriction(ColumnDefinition columnDef)
+         {
+             super(columnDef);
+         }
+ 
+         @Override
+         public MultiCBuilder appendTo(MultiCBuilder builder, QueryOptions options)
+         {
+             // no-op
+             return builder;
+         }
+ 
+         @Override
+         public void addRowFilterTo(RowFilter filter,
+                                    SecondaryIndexManager indexManager,
+                                    QueryOptions options) throws InvalidRequestException
+         {
+             // no-op
+         }
+ 
+         public void addFunctionsTo(List<Function> functions)
+         {
+             // no-op
+         }
+ 
+         MultiColumnRestriction toMultiColumnRestriction()
+         {
+             // no-op
+             return null;
+         }
+ 
+         public abstract List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException;
+     }
+ 
+     public static class SuperColumnKeyINRestrictionWithMarkers extends SuperColumnKeyINRestriction
+     {
+         protected final AbstractMarker marker;
+ 
+         public SuperColumnKeyINRestrictionWithMarkers(ColumnDefinition columnDef, AbstractMarker marker)
+         {
+             super(columnDef);
+             this.marker = marker;
+         }
+ 
+         public List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException
+         {
+             Terminal term = marker.bind(options);
+             checkNotNull(term, "Invalid null value for column %s", columnDef.name);
+             checkFalse(term == Constants.UNSET_VALUE, "Invalid unset value for column %s", columnDef.name);
+             Term.MultiItemTerminal lval = (Term.MultiItemTerminal) term;
+             return lval.getElements();
+         }
+     }
+ 
+     public static class SuperColumnKeyINRestrictionWithValues extends SuperColumnKeyINRestriction
+     {
+         private final List<Term> values;
+ 
+         public SuperColumnKeyINRestrictionWithValues(ColumnDefinition columnDef, List<Term> values)
+         {
+             super(columnDef);
+             this.values = values;
+         }
+ 
+         public List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException
+         {
+             List<ByteBuffer> buffers = new ArrayList<>(values.size());
+             for (Term value : values)
+                 buffers.add(value.bindAndGet(options));
+             return buffers;
+         }
+     }
+ 
+     public static class SuperColumnKeySliceRestriction extends SliceRestriction
+     {
+         // These are here to avoid polluting SliceRestriction
+         private Term term;
+ 
+         public SuperColumnKeySliceRestriction(ColumnDefinition columnDef, Bound bound, boolean inclusive, Term term)
+         {
+             super(columnDef, bound, inclusive, term);
+             this.term = term;
+         }
+ 
+         public ByteBuffer bindValue(QueryOptions options)
+         {
+             return term.bindAndGet(options);
+         }
+ 
+         @Override
+         public MultiCBuilder appendBoundTo(MultiCBuilder builder, Bound bound, QueryOptions options)
+         {
+             // no-op
+             return builder;
+         }
+ 
+         @Override
+         public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) throws InvalidRequestException
+         {
+             // no-op
+         }
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org