You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:11 UTC

[47/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index f8cd0dc..bd0ef3a 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.cql3.restrictions;
 
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import com.google.common.base.Joiner;
 
@@ -29,10 +27,8 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompositesBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
@@ -51,12 +47,12 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
     /**
      * Creates a new <code>TokenRestriction</code> that apply to the specified columns.
      *
-     * @param ctype the composite type
+     * @param comparator the clustering comparator
      * @param columnDefs the definition of the columns to which apply the token restriction
      */
-    public TokenRestriction(CType ctype, List<ColumnDefinition> columnDefs)
+    public TokenRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs)
     {
-        super(ctype);
+        super(comparator);
         this.columnDefs = columnDefs;
     }
 
@@ -91,27 +87,25 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
     }
 
     @Override
-    public final void addIndexExpressionTo(List<IndexExpression> expressions,
-                                     SecondaryIndexManager indexManager,
-                                     QueryOptions options)
+    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
     {
         throw new UnsupportedOperationException("Index expression cannot be created for token restriction");
     }
 
     @Override
-    public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
+    public MultiCBuilder appendTo(MultiCBuilder builder, QueryOptions options)
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+    public NavigableSet<Clustering> valuesAsClustering(QueryOptions options) throws InvalidRequestException
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions options) throws InvalidRequestException
     {
         throw new UnsupportedOperationException();
     }
@@ -153,16 +147,16 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
         if (restriction instanceof PrimaryKeyRestrictions)
             return (PrimaryKeyRestrictions) restriction;
 
-        return new PrimaryKeyRestrictionSet(ctype).mergeWith(restriction);
+        return new PrimaryKeyRestrictionSet(comparator, true).mergeWith(restriction);
     }
 
-    public static final class EQ extends TokenRestriction
+    public static final class EQRestriction extends TokenRestriction
     {
         private final Term value;
 
-        public EQ(CType ctype, List<ColumnDefinition> columnDefs, Term value)
+        public EQRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Term value)
         {
-            super(ctype, columnDefs);
+            super(comparator, columnDefs);
             this.value = value;
         }
 
@@ -192,13 +186,13 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
         }
     }
 
-    public static class Slice extends TokenRestriction
+    public static class SliceRestriction extends TokenRestriction
     {
         private final TermSlice slice;
 
-        public Slice(CType ctype, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
+        public SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
         {
-            super(ctype, columnDefs);
+            super(comparator, columnDefs);
             slice = TermSlice.newInstance(bound, inclusive, term);
         }
 
@@ -246,7 +240,7 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
                 throw invalidRequest("Columns \"%s\" cannot be restricted by both an equality and an inequality relation",
                                      getColumnNamesAsString());
 
-            TokenRestriction.Slice otherSlice = (TokenRestriction.Slice) otherRestriction;
+            TokenRestriction.SliceRestriction otherSlice = (TokenRestriction.SliceRestriction) otherRestriction;
 
             if (hasBound(Bound.START) && otherSlice.hasBound(Bound.START))
                 throw invalidRequest("More than one restriction was found for the start bound on %s",
@@ -256,7 +250,7 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
                 throw invalidRequest("More than one restriction was found for the end bound on %s",
                                      getColumnNamesAsString());
 
-            return new Slice(ctype, columnDefs,  slice.merge(otherSlice.slice));
+            return new SliceRestriction(comparator, columnDefs,  slice.merge(otherSlice.slice));
         }
 
         @Override
@@ -264,10 +258,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
         {
             return String.format("SLICE%s", slice);
         }
-
-        private Slice(CType ctype, List<ColumnDefinition> columnDefs, TermSlice slice)
+        private SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, TermSlice slice)
         {
-            super(ctype, columnDefs);
+            super(comparator, columnDefs);
             this.slice = slice;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 25278df..52ff657 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -29,9 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.CounterCell;
-import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -120,9 +118,6 @@ public abstract class Selection
      */
     public boolean containsACollection()
     {
-        if (!cfm.comparator.hasCollections())
-            return false;
-
         for (ColumnDefinition def : getColumns())
             if (def.type.isCollection() && def.type.isMultiCell())
                 return true;
@@ -237,9 +232,9 @@ public abstract class Selection
         return columnMapping;
     }
 
-    public ResultSetBuilder resultSetBuilder(long now, boolean isJson) throws InvalidRequestException
+    public ResultSetBuilder resultSetBuilder(boolean isJons) throws InvalidRequestException
     {
-        return new ResultSetBuilder(now, isJson);
+        return new ResultSetBuilder(isJons);
     }
 
     public abstract boolean isAggregate();
@@ -277,18 +272,22 @@ public abstract class Selection
         List<ByteBuffer> current;
         final long[] timestamps;
         final int[] ttls;
-        final long now;
 
         private final boolean isJson;
 
-        private ResultSetBuilder(long now, boolean isJson) throws InvalidRequestException
+        private ResultSetBuilder(boolean isJson) throws InvalidRequestException
         {
             this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), new ArrayList<List<ByteBuffer>>());
             this.selectors = newSelectors();
             this.timestamps = collectTimestamps ? new long[columns.size()] : null;
             this.ttls = collectTTLs ? new int[columns.size()] : null;
-            this.now = now;
             this.isJson = isJson;
+
+            // We use MIN_VALUE to indicate no timestamp and -1 for no ttl
+            if (timestamps != null)
+                Arrays.fill(timestamps, Long.MIN_VALUE);
+            if (ttls != null)
+                Arrays.fill(ttls, -1);
         }
 
         public void add(ByteBuffer v)
@@ -296,25 +295,28 @@ public abstract class Selection
             current.add(v);
         }
 
-        public void add(Cell c)
+        public void add(Cell c, int nowInSec)
         {
-            current.add(isDead(c) ? null : value(c));
-            if (timestamps != null)
+            if (c == null)
             {
-                timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE : c.timestamp();
+                current.add(null);
+                return;
             }
+
+            current.add(value(c));
+
+            if (timestamps != null)
+                timestamps[current.size() - 1] = c.livenessInfo().timestamp();
+
             if (ttls != null)
-            {
-                int ttl = -1;
-                if (!isDead(c) && c instanceof ExpiringCell)
-                    ttl = c.getLocalDeletionTime() - (int) (now / 1000);
-                ttls[current.size() - 1] = ttl;
-            }
+                ttls[current.size() - 1] = c.livenessInfo().remainingTTL(nowInSec);
         }
 
-        private boolean isDead(Cell c)
+        private ByteBuffer value(Cell c)
         {
-            return c == null || !c.isLive(now);
+            return c.isCounterCell()
+                 ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+                 : c.value();
         }
 
         public void newRow(int protocolVersion) throws InvalidRequestException
@@ -378,13 +380,6 @@ public abstract class Selection
             sb.append("}");
             return Collections.singletonList(UTF8Type.instance.getSerializer().serialize(sb.toString()));
         }
-
-        private ByteBuffer value(Cell c)
-        {
-            return (c instanceof CounterCell)
-                ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
-                : c.value();
-        }
     }
 
     private static interface Selectors

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index 9b7f0ba..4e4a220 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -59,7 +59,7 @@ public abstract class Selector implements AssignmentTestable
         {
             return new ColumnSpecification(cfm.ksName,
                                            cfm.cfName,
-                                           new ColumnIdentifier(getColumnName(), true),
+                                           ColumnIdentifier.getInterned(getColumnName(), true),
                                            getReturnType());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index db4c8dc..b6937af 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -92,12 +92,12 @@ public class AlterTableStatement extends SchemaAlteringStatement
         {
             case ADD:
                 assert columnName != null;
-                if (cfm.comparator.isDense())
+                if (cfm.isDense())
                     throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
 
                 if (isStatic)
                 {
-                    if (!cfm.comparator.isCompound())
+                    if (!cfm.isCompound())
                         throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
                     if (cfm.clusteringColumns().isEmpty())
                         throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
@@ -122,26 +122,23 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 AbstractType<?> type = validator.getType();
                 if (type.isCollection() && type.isMultiCell())
                 {
-                    if (!cfm.comparator.supportCollections())
-                        throw new InvalidRequestException("Cannot use non-frozen collections with a non-composite PRIMARY KEY");
+                    if (!cfm.isCompound())
+                        throw new InvalidRequestException("Cannot use non-frozen collections in COMPACT STORAGE tables");
                     if (cfm.isSuper())
                         throw new InvalidRequestException("Cannot use non-frozen collections with super column families");
 
-                    // If there used to be a collection column with the same name (that has been dropped), it will
-                    // still be appear in the ColumnToCollectionType because or reasons explained on #6276. The same
-                    // reason mean that we can't allow adding a new collection with that name (see the ticket for details).
-                    if (cfm.comparator.hasCollections())
-                    {
-                        CollectionType previous = cfm.comparator.collectionType() == null ? null : cfm.comparator.collectionType().defined.get(columnName.bytes);
-                        if (previous != null && !type.isCompatibleWith(previous))
-                            throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
-                                        "because a collection with the same name and a different type has already been used in the past", columnName));
-                    }
-
-                    cfm.comparator = cfm.comparator.addOrUpdateCollection(columnName, (CollectionType)type);
+                    // If there used to be a collection column with the same name (that has been dropped), we could still have
+                    // some data using the old type, and so we can't allow adding a collection with the same name unless
+                    // the types are compatible (see #6276).
+                    CFMetaData.DroppedColumn dropped = cfm.getDroppedColumns().get(columnName);
+                    // We could have type == null for old dropped columns, in which case we play it safe and refuse
+                    if (dropped != null && (dropped.type == null || (dropped.type instanceof CollectionType && !type.isCompatibleWith(dropped.type))))
+                        throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
+                                    "because a collection with the same name and a different type%s has already been used in the past",
+                                    columnName, dropped.type == null ? "" : " (" + dropped.type.asCQL3Type() + ")"));
                 }
 
-                Integer componentIndex = cfm.comparator.isCompound() ? cfm.comparator.clusteringPrefixSize() : null;
+                Integer componentIndex = cfm.isCompound() ? cfm.comparator.size() : null;
                 cfm.addColumnDefinition(isStatic
                                         ? ColumnDefinition.staticDef(cfm, columnName.bytes, type, componentIndex)
                                         : ColumnDefinition.regularDef(cfm, columnName.bytes, type, componentIndex));
@@ -158,28 +155,13 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     case PARTITION_KEY:
                         if (validatorType instanceof CounterColumnType)
                             throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", columnName));
-                        if (cfm.getKeyValidator() instanceof CompositeType)
-                        {
-                            List<AbstractType<?>> oldTypes = ((CompositeType) cfm.getKeyValidator()).types;
-                            if (!validatorType.isValueCompatibleWith(oldTypes.get(def.position())))
-                                throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
-                                                                               columnName,
-                                                                               oldTypes.get(def.position()).asCQL3Type(),
-                                                                               validator));
 
-                            List<AbstractType<?>> newTypes = new ArrayList<AbstractType<?>>(oldTypes);
-                            newTypes.set(def.position(), validatorType);
-                            cfm.keyValidator(CompositeType.getInstance(newTypes));
-                        }
-                        else
-                        {
-                            if (!validatorType.isValueCompatibleWith(cfm.getKeyValidator()))
-                                throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
-                                                                               columnName,
-                                                                               cfm.getKeyValidator().asCQL3Type(),
-                                                                               validator));
-                            cfm.keyValidator(validatorType);
-                        }
+                        AbstractType<?> currentType = cfm.getKeyValidatorAsClusteringComparator().subtype(def.position());
+                        if (!validatorType.isValueCompatibleWith(currentType))
+                            throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
+                                                                           columnName,
+                                                                           currentType.asCQL3Type(),
+                                                                           validator));
                         break;
                     case CLUSTERING_COLUMN:
                         AbstractType<?> oldType = cfm.comparator.subtype(def.position());
@@ -192,16 +174,6 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                                            oldType.asCQL3Type(),
                                                                            validator));
 
-                        cfm.comparator = cfm.comparator.setSubtype(def.position(), validatorType);
-                        break;
-                    case COMPACT_VALUE:
-                        // See below
-                        if (!validatorType.isValueCompatibleWith(cfm.getDefaultValidator()))
-                            throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
-                                                                           columnName,
-                                                                           cfm.getDefaultValidator().asCQL3Type(),
-                                                                           validator));
-                        cfm.defaultValidator(validatorType);
                         break;
                     case REGULAR:
                     case STATIC:
@@ -215,14 +187,6 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                                            columnName,
                                                                            def.type.asCQL3Type(),
                                                                            validator));
-
-                        // For collections, if we alter the type, we need to update the comparator too since it includes
-                        // the type too (note that isValueCompatibleWith above has validated that the new type doesn't
-                        // change the underlying sorting order, but we still don't want to have a discrepancy between the type
-                        // in the comparator and the one in the ColumnDefinition as that would be dodgy).
-                        if (validatorType.isCollection() && validatorType.isMultiCell())
-                            cfm.comparator = cfm.comparator.addOrUpdateCollection(def.name, (CollectionType)validatorType);
-
                         break;
                 }
                 // In any case, we update the column definition
@@ -231,7 +195,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
 
             case DROP:
                 assert columnName != null;
-                if (!cfm.isCQL3Table())
+                if (!cfm.isCQLTable())
                     throw new InvalidRequestException("Cannot drop columns from a non-CQL3 table");
                 if (def == null)
                     throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
@@ -244,7 +208,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     case REGULAR:
                     case STATIC:
                         ColumnDefinition toDelete = null;
-                        for (ColumnDefinition columnDef : cfm.regularAndStaticColumns())
+                        for (ColumnDefinition columnDef : cfm.partitionColumns())
                         {
                             if (columnDef.name.equals(columnName))
                             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index 74fafd6..4e78bfc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -23,7 +23,7 @@ import java.util.*;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
@@ -150,28 +150,6 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
         // We need to update this validator ...
         cfm.addOrReplaceColumnDefinition(def.withNewType(t));
-
-        // ... but if it's part of the comparator or key validator, we need to go update those too.
-        switch (def.kind)
-        {
-            case PARTITION_KEY:
-                cfm.keyValidator(updateWith(cfm.getKeyValidator(), keyspace, toReplace, updated));
-                break;
-            case CLUSTERING_COLUMN:
-                cfm.comparator = CellNames.fromAbstractType(updateWith(cfm.comparator.asAbstractType(), keyspace, toReplace, updated), cfm.comparator.isDense());
-                break;
-            default:
-                // If it's a collection, we still want to modify the comparator because the collection is aliased in it
-                if (def.type instanceof CollectionType && def.type.isMultiCell())
-                {
-                    t = updateWith(cfm.comparator.asAbstractType(), keyspace, toReplace, updated);
-                    // If t == null, all relevant comparators were updated via updateWith, which reaches into types and
-                    // collections
-                    if (t != null)
-                        cfm.comparator = CellNames.fromAbstractType(t, cfm.comparator.isDense());
-                }
-                break;
-        }
         return true;
     }
 
@@ -203,23 +181,6 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
             List<AbstractType<?>> updatedTypes = updateTypes(ct.types, keyspace, toReplace, updated);
             return updatedTypes == null ? null : CompositeType.getInstance(updatedTypes);
         }
-        else if (type instanceof ColumnToCollectionType)
-        {
-            ColumnToCollectionType ctct = (ColumnToCollectionType)type;
-            Map<ByteBuffer, CollectionType> updatedTypes = null;
-            for (Map.Entry<ByteBuffer, CollectionType> entry : ctct.defined.entrySet())
-            {
-                AbstractType<?> t = updateWith(entry.getValue(), keyspace, toReplace, updated);
-                if (t == null)
-                    continue;
-
-                if (updatedTypes == null)
-                    updatedTypes = new HashMap<>(ctct.defined);
-
-                updatedTypes.put(entry.getKey(), (CollectionType)t);
-            }
-            return updatedTypes == null ? null : ColumnToCollectionType.getInstance(updatedTypes);
-        }
         else if (type instanceof CollectionType)
         {
             if (type instanceof ListType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 0661b56..08a47c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -32,7 +32,8 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
@@ -40,6 +41,7 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
@@ -57,6 +59,10 @@ public class BatchStatement implements CQLStatement
     private final int boundTerms;
     public final Type type;
     private final List<ModificationStatement> statements;
+    private final PartitionColumns updatedColumns;
+    private final PartitionColumns conditionColumns;
+    private final boolean updatesRegularRows;
+    private final boolean updatesStaticRow;
     private final Attributes attrs;
     private final boolean hasConditions;
     private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
@@ -72,14 +78,33 @@ public class BatchStatement implements CQLStatement
      */
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
-        boolean hasConditions = false;
-        for (ModificationStatement statement : statements)
-            hasConditions |= statement.hasConditions();
-
         this.boundTerms = boundTerms;
         this.type = type;
         this.statements = statements;
         this.attrs = attrs;
+
+        boolean hasConditions = false;
+        PartitionColumns.Builder regularBuilder = PartitionColumns.builder();
+        PartitionColumns.Builder conditionBuilder = PartitionColumns.builder();
+        boolean updateRegular = false;
+        boolean updateStatic = false;
+
+        for (ModificationStatement stmt : statements)
+        {
+            regularBuilder.addAll(stmt.updatedColumns());
+            updateRegular |= stmt.updatesRegularRows();
+            if (stmt.hasConditions())
+            {
+                hasConditions = true;
+                conditionBuilder.addAll(stmt.conditionColumns());
+                updateStatic |= stmt.updatesStaticRow();
+            }
+        }
+
+        this.updatedColumns = regularBuilder.build();
+        this.conditionColumns = conditionBuilder.build();
+        this.updatesRegularRows = updateRegular;
+        this.updatesStaticRow = updateStatic;
         this.hasConditions = hasConditions;
     }
 
@@ -199,6 +224,18 @@ public class BatchStatement implements CQLStatement
         return ms;
     }
 
+    private PartitionColumns updatedColumns()
+    {
+        return updatedColumns;
+    }
+
+    private int updatedRows()
+    {
+        // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation
+        // for sizing our PartitionUpdate backing array, so it's good enough.
+        return statements.size();
+    }
+
     private void addStatementMutations(ModificationStatement statement,
                                        QueryOptions options,
                                        boolean local,
@@ -218,25 +255,33 @@ public class BatchStatement implements CQLStatement
         // we don't want to recreate mutations every time as this is particularly inefficient when applying
         // multiple batch to the same partition (see #6737).
         List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
-        Composite clusteringPrefix = statement.createClusteringPrefix(options);
-        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
+        CBuilder clustering = statement.createClustering(options);
+        UpdateParameters params = statement.makeUpdateParameters(keys, clustering, options, local, now);
 
         for (ByteBuffer key : keys)
         {
-            IMutation mutation = ksMap.get(key);
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+            IMutation mutation = ksMap.get(dk.getKey());
             Mutation mut;
             if (mutation == null)
             {
-                mut = new Mutation(ksName, key);
+                mut = new Mutation(ksName, dk);
                 mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
-                ksMap.put(key, mutation);
+                ksMap.put(dk.getKey(), mutation);
             }
             else
             {
                 mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
             }
 
-            statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
+            PartitionUpdate upd = mut.get(statement.cfm);
+            if (upd == null)
+            {
+                upd = new PartitionUpdate(statement.cfm, dk, updatedColumns(), updatedRows());
+                mut.add(upd);
+            }
+
+            statement.addUpdateForKey(upd, clustering, params);
         }
     }
 
@@ -245,56 +290,55 @@ public class BatchStatement implements CQLStatement
      *
      * @param cfs ColumnFamilies that will store the batch's mutations.
      */
-    public static void verifyBatchSize(Iterable<ColumnFamily> cfs) throws InvalidRequestException
+    public static void verifyBatchSize(Iterable<PartitionUpdate> updates) throws InvalidRequestException
     {
         long size = 0;
         long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
         long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold();
 
-        for (ColumnFamily cf : cfs)
-            size += cf.dataSize();
+        for (PartitionUpdate update : updates)
+            size += update.dataSize();
 
         if (size > warnThreshold)
         {
-            Set<String> ksCfPairs = new HashSet<>();
-            for (ColumnFamily cf : cfs)
-                ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+            Set<String> tableNames = new HashSet<>();
+            for (PartitionUpdate update : updates)
+                tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
 
             String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}";
             if (size > failThreshold)
             {
-                Tracing.trace(format, ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
-                logger.error(format, ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
+                Tracing.trace(format, tableNames, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
+                logger.error(format, tableNames, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
                 throw new InvalidRequestException("Batch too large");
             }
             else if (logger.isWarnEnabled())
             {
-                logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
+                logger.warn(format, tableNames, size, warnThreshold, size - warnThreshold, "");
             }
-            ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {ksCfPairs, size, warnThreshold, size - warnThreshold, ""}).getMessage());
+            ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage());
         }
     }
 
-    private void verifyBatchType(Collection<? extends IMutation> mutations)
+    private void verifyBatchType(Iterable<PartitionUpdate> updates)
     {
-        if (type != Type.LOGGED && mutations.size() > 1)
+        if (type != Type.LOGGED && Iterables.size(updates) > 1)
         {
-            Set<String> ksCfPairs = new HashSet<>();
-            Set<ByteBuffer> keySet = new HashSet<>();
+            Set<DecoratedKey> keySet = new HashSet<>();
+            Set<String> tableNames = new HashSet<>();
 
-            for (IMutation im : mutations)
+            for (PartitionUpdate update : updates)
             {
-                keySet.add(im.key());
-                for (ColumnFamily cf : im.getColumnFamilies())
-                    ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+                keySet.add(update.partitionKey());
+                tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
             }
 
             NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
                              keySet.size(), keySet.size() == 1 ? "" : "s",
-                             ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
+                             tableNames.size() == 1 ? "" : "s", tableNames);
 
             ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
-                                                    ksCfPairs.size() == 1 ? "" : "s", ksCfPairs}).getMessage());
+                                                    tableNames.size() == 1 ? "" : "s", tableNames}).getMessage());
 
         }
     }
@@ -326,17 +370,17 @@ public class BatchStatement implements CQLStatement
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
-        // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable.
-        Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>()
+        // Extract each collection of updates from it's IMutation and then lazily concatenate all of them into a single Iterable.
+        Iterable<PartitionUpdate> updates = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<PartitionUpdate>>()
         {
-            public Collection<ColumnFamily> apply(IMutation im)
+            public Collection<PartitionUpdate> apply(IMutation im)
             {
-                return im.getColumnFamilies();
+                return im.getPartitionUpdates();
             }
         }));
 
-        verifyBatchSize(cfs);
-        verifyBatchType(mutations);
+        verifyBatchSize(updates);
+        verifyBatchType(updates);
 
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
@@ -349,27 +393,26 @@ public class BatchStatement implements CQLStatement
         CQL3CasRequest casRequest = p.left;
         Set<ColumnDefinition> columnsWithConditions = p.right;
 
-        ColumnFamily result = StorageProxy.cas(casRequest.cfm.ksName,
-                                               casRequest.cfm.cfName,
-                                               casRequest.key,
-                                               casRequest,
-                                               options.getSerialConsistency(),
-                                               options.getConsistency(),
-                                               state.getClientState());
-
-        return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(casRequest.cfm.ksName,
-                                                                              casRequest.key,
-                                                                              casRequest.cfm.cfName,
-                                                                              result,
-                                                                              columnsWithConditions,
-                                                                              true,
-                                                                              options.forStatement(0)));
+        String ksName = casRequest.cfm.ksName;
+        String tableName = casRequest.cfm.cfName;
+
+        try (RowIterator result = StorageProxy.cas(ksName,
+                                                   tableName,
+                                                   casRequest.key,
+                                                   casRequest,
+                                                   options.getSerialConsistency(),
+                                                   options.getConsistency(),
+                                                   state.getClientState()))
+        {
+            return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0)));
+        }
     }
 
+
     private Pair<CQL3CasRequest,Set<ColumnDefinition>> makeCasRequest(BatchQueryOptions options, QueryState state)
     {
         long now = state.getTimestamp();
-        ByteBuffer key = null;
+        DecoratedKey key = null;
         CQL3CasRequest casRequest = null;
         Set<ColumnDefinition> columnsWithConditions = new LinkedHashSet<>();
 
@@ -383,25 +426,25 @@ public class BatchStatement implements CQLStatement
                 throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
             if (key == null)
             {
-                key = pks.get(0);
-                casRequest = new CQL3CasRequest(statement.cfm, key, true);
+                key = StorageService.getPartitioner().decorateKey(pks.get(0));
+                casRequest = new CQL3CasRequest(statement.cfm, key, true, conditionColumns, updatesRegularRows, updatesStaticRow);
             }
-            else if (!key.equals(pks.get(0)))
+            else if (!key.getKey().equals(pks.get(0)))
             {
                 throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
             }
 
-            Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions);
+            CBuilder cbuilder = statement.createClustering(statementOptions);
             if (statement.hasConditions())
             {
-                statement.addConditions(clusteringPrefix, casRequest, statementOptions);
+                statement.addConditions(cbuilder.build(), casRequest, statementOptions);
                 // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition())
                     columnsWithConditions = null;
                 else if (columnsWithConditions != null)
                     Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions());
             }
-            casRequest.addRowUpdate(clusteringPrefix, statement, statementOptions, timestamp);
+            casRequest.addRowUpdate(cbuilder, statement, statementOptions, timestamp);
         }
 
         return Pair.create(casRequest, columnsWithConditions);
@@ -436,15 +479,13 @@ public class BatchStatement implements CQLStatement
         CQL3CasRequest request = p.left;
         Set<ColumnDefinition> columnsWithConditions = p.right;
 
-        ColumnFamily result = ModificationStatement.casInternal(request, state);
+        String ksName = request.cfm.ksName;
+        String tableName = request.cfm.cfName;
 
-        return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(request.cfm.ksName,
-                                                                              request.key,
-                                                                              request.cfm.cfName,
-                                                                              result,
-                                                                              columnsWithConditions,
-                                                                              true,
-                                                                              options.forStatement(0)));
+        try (RowIterator result = ModificationStatement.casInternal(request, state))
+        {
+            return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0)));
+        }
     }
 
     public interface BatchVariables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 081a14e..9352930 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -25,8 +25,8 @@ import com.google.common.collect.Multimap;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.utils.Pair;
@@ -36,37 +36,45 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CQL3CasRequest implements CASRequest
 {
-    final CFMetaData cfm;
-    final ByteBuffer key;
-    final long now;
-    final boolean isBatch;
-
-    // We index RowCondition by the prefix of the row they applied to for 2 reasons:
+    public final CFMetaData cfm;
+    public final DecoratedKey key;
+    public final boolean isBatch;
+    private final PartitionColumns conditionColumns;
+    private final boolean updatesRegularRows;
+    private final boolean updatesStaticRow;
+    private boolean hasExists; // whether we have an exist or if not exist condition
+
+    // We index RowCondition by the clustering of the row they applied to for 2 reasons:
     //   1) this allows to keep things sorted to build the ColumnSlice array below
     //   2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row)
-    private final SortedMap<Composite, RowCondition> conditions;
+    private final SortedMap<Clustering, RowCondition> conditions;
 
     private final List<RowUpdate> updates = new ArrayList<>();
 
-    public CQL3CasRequest(CFMetaData cfm, ByteBuffer key, boolean isBatch)
+    public CQL3CasRequest(CFMetaData cfm,
+                          DecoratedKey key,
+                          boolean isBatch,
+                          PartitionColumns conditionColumns,
+                          boolean updatesRegularRows,
+                          boolean updatesStaticRow)
     {
         this.cfm = cfm;
-        // When checking if conditions apply, we want to use a fixed reference time for a whole request to check
-        // for expired cells. Note that this is unrelated to the cell timestamp.
-        this.now = System.currentTimeMillis();
         this.key = key;
         this.conditions = new TreeMap<>(cfm.comparator);
         this.isBatch = isBatch;
+        this.conditionColumns = conditionColumns;
+        this.updatesRegularRows = updatesRegularRows;
+        this.updatesStaticRow = updatesStaticRow;
     }
 
-    public void addRowUpdate(Composite prefix, ModificationStatement stmt, QueryOptions options, long timestamp)
+    public void addRowUpdate(CBuilder cbuilder, ModificationStatement stmt, QueryOptions options, long timestamp)
     {
-        updates.add(new RowUpdate(prefix, stmt, options, timestamp));
+        updates.add(new RowUpdate(cbuilder, stmt, options, timestamp));
     }
 
-    public void addNotExist(Composite prefix) throws InvalidRequestException
+    public void addNotExist(Clustering clustering) throws InvalidRequestException
     {
-        RowCondition previous = conditions.put(prefix, new NotExistCondition(prefix, now));
+        RowCondition previous = conditions.put(clustering, new NotExistCondition(clustering));
         if (previous != null && !(previous instanceof NotExistCondition))
         {
             // these should be prevented by the parser, but it doesn't hurt to check
@@ -75,23 +83,25 @@ public class CQL3CasRequest implements CASRequest
             else
                 throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
         }
+        hasExists = true;
     }
 
-    public void addExist(Composite prefix) throws InvalidRequestException
+    public void addExist(Clustering clustering) throws InvalidRequestException
     {
-        RowCondition previous = conditions.put(prefix, new ExistCondition(prefix, now));
+        RowCondition previous = conditions.put(clustering, new ExistCondition(clustering));
         // this should be prevented by the parser, but it doesn't hurt to check
         if (previous instanceof NotExistCondition)
             throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row");
+        hasExists = true;
     }
 
-    public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
+    public void addConditions(Clustering clustering, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
     {
-        RowCondition condition = conditions.get(prefix);
+        RowCondition condition = conditions.get(clustering);
         if (condition == null)
         {
-            condition = new ColumnsConditions(prefix, now);
-            conditions.put(prefix, condition);
+            condition = new ColumnsConditions(clustering);
+            conditions.put(clustering, condition);
         }
         else if (!(condition instanceof ColumnsConditions))
         {
@@ -100,24 +110,43 @@ public class CQL3CasRequest implements CASRequest
         ((ColumnsConditions)condition).addConditions(conds, options);
     }
 
-    public IDiskAtomFilter readFilter()
+    private PartitionColumns columnsToRead()
+    {
+        // If all our conditions are columns conditions (IF x = ?), then it's enough to query
+        // the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however,
+        // we need to query all columns for the row since if the condition fails, we want to
+        // return everything to the user. Static columns make this a bit more complex, in that
+        // if an insert only static columns, then the existence condition applies only to the
+        // static columns themselves, and so we don't want to include regular columns in that
+        // case.
+        if (hasExists)
+        {
+            PartitionColumns allColumns = cfm.partitionColumns();
+            Columns statics = updatesStaticRow ? allColumns.statics : Columns.NONE;
+            Columns regulars = updatesRegularRows ? allColumns.regulars : Columns.NONE;
+            return new PartitionColumns(statics, regulars);
+        }
+        return conditionColumns;
+    }
+
+    public SinglePartitionReadCommand readCommand(int nowInSec)
     {
         assert !conditions.isEmpty();
-        ColumnSlice[] slices = new ColumnSlice[conditions.size()];
-        int i = 0;
+        Slices.Builder builder = new Slices.Builder(cfm.comparator, conditions.size());
         // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists
         // but all values for which there were conditions are null" and "row doesn't exists", and we can't rely on the
         // row marker for that (see #6623)
-        for (Composite prefix : conditions.keySet())
-            slices[i++] = prefix.slice();
+        for (Clustering clustering : conditions.keySet())
+        {
+            if (clustering != Clustering.STATIC_CLUSTERING)
+                builder.add(Slice.make(clustering));
+        }
 
-        int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
-        slices = ColumnSlice.deoverlapSlices(slices, cfm.comparator);
-        assert ColumnSlice.validateSlices(slices, cfm.comparator, false);
-        return new SliceQueryFilter(slices, false, slices.length, toGroup);
+        ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(builder.build(), false);
+        return SinglePartitionReadCommand.create(cfm, nowInSec, key, ColumnFilter.selection(columnsToRead()), filter);
     }
 
-    public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+    public boolean appliesTo(FilteredPartition current) throws InvalidRequestException
     {
         for (RowCondition condition : conditions.values())
         {
@@ -127,16 +156,24 @@ public class CQL3CasRequest implements CASRequest
         return true;
     }
 
-    public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException
+    private PartitionColumns updatedColumns()
+    {
+        PartitionColumns.Builder builder = PartitionColumns.builder();
+        for (RowUpdate upd : updates)
+            builder.addAll(upd.stmt.updatedColumns());
+        return builder.build();
+    }
+
+    public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException
     {
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
+        PartitionUpdate update = new PartitionUpdate(cfm, key, updatedColumns(), conditions.size());
         for (RowUpdate upd : updates)
-            upd.applyUpdates(current, cf);
+            upd.applyUpdates(current, update);
 
         if (isBatch)
-            BatchStatement.verifyBatchSize(Collections.singleton(cf));
+            BatchStatement.verifyBatchSize(Collections.singleton(update));
 
-        return cf;
+        return update;
     }
 
     /**
@@ -147,89 +184,62 @@ public class CQL3CasRequest implements CASRequest
      */
     private class RowUpdate
     {
-        private final Composite rowPrefix;
+        private final CBuilder cbuilder;
         private final ModificationStatement stmt;
         private final QueryOptions options;
         private final long timestamp;
 
-        private RowUpdate(Composite rowPrefix, ModificationStatement stmt, QueryOptions options, long timestamp)
+        private RowUpdate(CBuilder cbuilder, ModificationStatement stmt, QueryOptions options, long timestamp)
         {
-            this.rowPrefix = rowPrefix;
+            this.cbuilder = cbuilder;
             this.stmt = stmt;
             this.options = options;
             this.timestamp = timestamp;
         }
 
-        public void applyUpdates(ColumnFamily current, ColumnFamily updates) throws InvalidRequestException
+        public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
         {
-            Map<ByteBuffer, CQL3Row> map = null;
-            if (stmt.requiresRead())
-            {
-                // Uses the "current" values read by Paxos for lists operation that requires a read
-                Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(current.iterator(new ColumnSlice[]{ rowPrefix.slice() }));
-                if (iter.hasNext())
-                {
-                    map = Collections.singletonMap(key, iter.next());
-                    assert !iter.hasNext() : "We shoudn't be updating more than one CQL row per-ModificationStatement";
-                }
-            }
-
-            UpdateParameters params = new UpdateParameters(cfm, options, timestamp, stmt.getTimeToLive(options), map);
-            stmt.addUpdateForKey(updates, key, rowPrefix, params);
+            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
+            UpdateParameters params = new UpdateParameters(cfm, options, timestamp, stmt.getTimeToLive(options), map, true);
+            stmt.addUpdateForKey(updates, cbuilder, params);
         }
     }
 
     private static abstract class RowCondition
     {
-        public final Composite rowPrefix;
-        protected final long now;
+        public final Clustering clustering;
 
-        protected RowCondition(Composite rowPrefix, long now)
+        protected RowCondition(Clustering clustering)
         {
-            this.rowPrefix = rowPrefix;
-            this.now = now;
+            this.clustering = clustering;
         }
 
-        public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
+        public abstract boolean appliesTo(FilteredPartition current) throws InvalidRequestException;
     }
 
     private static class NotExistCondition extends RowCondition
     {
-        private NotExistCondition(Composite rowPrefix, long now)
+        private NotExistCondition(Clustering clustering)
         {
-            super(rowPrefix, now);
+            super(clustering);
         }
 
-        public boolean appliesTo(ColumnFamily current)
+        public boolean appliesTo(FilteredPartition current)
         {
-            if (current == null)
-                return true;
-
-            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() });
-            while (iter.hasNext())
-                if (iter.next().isLive(now))
-                    return false;
-            return true;
+            return current == null || current.getRow(clustering) == null;
         }
     }
 
     private static class ExistCondition extends RowCondition
     {
-        private ExistCondition(Composite rowPrefix, long now)
+        private ExistCondition(Clustering clustering)
         {
-            super (rowPrefix, now);
+            super(clustering);
         }
 
-        public boolean appliesTo(ColumnFamily current)
+        public boolean appliesTo(FilteredPartition current)
         {
-            if (current == null)
-                return false;
-
-            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() });
-            while (iter.hasNext())
-                if (iter.next().isLive(now))
-                    return true;
-            return false;
+            return current != null && current.getRow(clustering) != null;
         }
     }
 
@@ -237,9 +247,9 @@ public class CQL3CasRequest implements CASRequest
     {
         private final Multimap<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.Bound> conditions = HashMultimap.create();
 
-        private ColumnsConditions(Composite rowPrefix, long now)
+        private ColumnsConditions(Clustering clustering)
         {
-            super(rowPrefix, now);
+            super(clustering);
         }
 
         public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
@@ -251,14 +261,16 @@ public class CQL3CasRequest implements CASRequest
             }
         }
 
-        public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+        public boolean appliesTo(FilteredPartition current) throws InvalidRequestException
         {
             if (current == null)
                 return conditions.isEmpty();
 
             for (ColumnCondition.Bound condition : conditions.values())
-                if (!condition.appliesTo(rowPrefix, current, now))
+            {
+                if (!condition.appliesTo(current.getRow(clustering)))
                     return false;
+            }
             return true;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index c3b0993..cc808ac 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -111,12 +111,14 @@ public class CreateIndexStatement extends SchemaAlteringStatement
 
         properties.validate();
 
-        // TODO: we could lift that limitation
-        if ((cfm.comparator.isDense() || !cfm.comparator.isCompound()) && cd.isPrimaryKeyColumn())
-            throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
-
-        if (cd.kind == ColumnDefinition.Kind.COMPACT_VALUE)
-            throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
+        if (cfm.isCompactTable())
+        {
+            if (!cfm.isStaticCompactTable())
+                throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
+            else if (cd.isPrimaryKeyColumn())
+                // TODO: we could lift that limitation
+                throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
+        }
 
         // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
         // CompositesIndex) and maybe we should, but that means a query like:
@@ -124,7 +126,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
         // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
         // such indexing is actually useful.
-        if (cd.isStatic())
+        if (!cfm.isCompactTable() && cd.isStatic())
             throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
 
         if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
@@ -174,7 +176,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         {
             cd.setIndexType(IndexType.CUSTOM, properties.getOptions());
         }
-        else if (cfm.comparator.isCompound())
+        else if (cfm.isCompound())
         {
             Map<String, String> options = Collections.emptyMap();
             // For now, we only allow indexing values for collections, but we could later allow

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index b3591a2..8602af1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -26,11 +26,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.compress.CompressionParameters;
@@ -43,15 +40,18 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 /** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
 public class CreateTableStatement extends SchemaAlteringStatement
 {
-    public CellNameType comparator;
-    private AbstractType<?> defaultValidator;
-    private AbstractType<?> keyValidator;
+    private List<AbstractType<?>> keyTypes;
+    private List<AbstractType<?>> clusteringTypes;
 
-    private final List<ByteBuffer> keyAliases = new ArrayList<ByteBuffer>();
-    private final List<ByteBuffer> columnAliases = new ArrayList<ByteBuffer>();
+    private Map<ByteBuffer, CollectionType> collections = new HashMap<>();
+
+    private final List<ColumnIdentifier> keyAliases = new ArrayList<>();
+    private final List<ColumnIdentifier> columnAliases = new ArrayList<>();
     private ByteBuffer valueAlias;
 
     private boolean isDense;
+    private boolean isCompound;
+    private boolean hasCounters;
 
     // use a TreeMap to preserve ordering across JDK versions (see CASSANDRA-9492)
     private final Map<ColumnIdentifier, AbstractType> columns = new TreeMap<>(new Comparator<ColumnIdentifier>()
@@ -90,22 +90,6 @@ public class CreateTableStatement extends SchemaAlteringStatement
         // validated in announceMigration()
     }
 
-    // Column definitions
-    private List<ColumnDefinition> getColumns(CFMetaData cfm)
-    {
-        List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
-        Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
-        for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
-        {
-            ColumnIdentifier id = col.getKey();
-            columnDefs.add(staticColumns.contains(id)
-                           ? ColumnDefinition.staticDef(cfm, col.getKey().bytes, col.getValue(), componentIndex)
-                           : ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
-        }
-
-        return columnDefs;
-    }
-
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
         try
@@ -142,6 +126,46 @@ public class CreateTableStatement extends SchemaAlteringStatement
         }
     }
 
+    public CFMetaData.Builder metadataBuilder()
+    {
+        CFMetaData.Builder builder = CFMetaData.Builder.create(keyspace(), columnFamily(), isDense, isCompound, hasCounters);
+        for (int i = 0; i < keyAliases.size(); i++)
+            builder.addPartitionKey(keyAliases.get(i), keyTypes.get(i));
+        for (int i = 0; i < columnAliases.size(); i++)
+            builder.addClusteringColumn(columnAliases.get(i), clusteringTypes.get(i));
+
+        boolean isStaticCompact = !isDense && !isCompound;
+        for (Map.Entry<ColumnIdentifier, AbstractType> entry : columns.entrySet())
+        {
+            ColumnIdentifier name = entry.getKey();
+            // Note that for "static" no-clustering compact storage we use static for the defined columns
+            if (staticColumns.contains(name) || isStaticCompact)
+                builder.addStaticColumn(name, entry.getValue());
+            else
+                builder.addRegularColumn(name, entry.getValue());
+        }
+
+        boolean isCompactTable = isDense || !isCompound;
+        if (isCompactTable)
+        {
+            CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(builder.usedColumnNames());
+            // Compact tables always have a clustering and a single regular value.
+            if (isStaticCompact)
+            {
+                builder.addClusteringColumn(names.defaultClusteringName(), UTF8Type.instance);
+                builder.addRegularColumn(names.defaultCompactValueName(), hasCounters ? CounterColumnType.instance : BytesType.instance);
+            }
+            else if (isDense && !builder.hasRegulars())
+            {
+                // Even for dense, we might not have our regular column if it wasn't part of the declaration. If
+                // that's the case, add it but with a specific EmptyType so we can recognize that case later
+                builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
+            }
+        }
+
+        return builder;
+    }
+
     /**
      * Returns a CFMetaData instance based on the parameters parsed from this
      * <code>CREATE</code> statement, or defaults where applicable.
@@ -151,48 +175,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
      */
     public CFMetaData getCFMetaData() throws RequestValidationException
     {
-        CFMetaData newCFMD;
-        newCFMD = new CFMetaData(keyspace(),
-                                 columnFamily(),
-                                 ColumnFamilyType.Standard,
-                                 comparator);
+        CFMetaData newCFMD = metadataBuilder().build();
         applyPropertiesTo(newCFMD);
         return newCFMD;
     }
 
     public void applyPropertiesTo(CFMetaData cfmd) throws RequestValidationException
     {
-        cfmd.defaultValidator(defaultValidator)
-            .keyValidator(keyValidator)
-            .addAllColumnDefinitions(getColumns(cfmd))
-            .isDense(isDense);
-
-        addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
-        addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
-        if (valueAlias != null)
-            addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
-
         properties.applyToCFMetadata(cfmd);
     }
 
-    private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
-    {
-        if (comparator instanceof CompositeType)
-        {
-            CompositeType ct = (CompositeType)comparator;
-            for (int i = 0; i < aliases.size(); ++i)
-                if (aliases.get(i) != null)
-                    cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
-        }
-        else
-        {
-            assert aliases.size() <= 1;
-            if (!aliases.isEmpty() && aliases.get(0) != null)
-                cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind));
-        }
-    }
-
-
     public static class RawStatement extends CFStatement
     {
         private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
@@ -233,169 +225,107 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
             CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
 
-            boolean hasCounters = false;
-            Map<ByteBuffer, CollectionType> definedMultiCellCollections = null;
             for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
             {
                 ColumnIdentifier id = entry.getKey();
                 CQL3Type pt = entry.getValue().prepare(keyspace());
-                if (pt.isCollection() && ((CollectionType) pt.getType()).isMultiCell())
-                {
-                    if (definedMultiCellCollections == null)
-                        definedMultiCellCollections = new HashMap<>();
-                    definedMultiCellCollections.put(id.bytes, (CollectionType) pt.getType());
-                }
-                else if (entry.getValue().isCounter())
-                    hasCounters = true;
-
+                if (pt.isCollection() && ((CollectionType)pt.getType()).isMultiCell())
+                    stmt.collections.put(id.bytes, (CollectionType)pt.getType());
+                if (entry.getValue().isCounter())
+                    stmt.hasCounters = true;
                 stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
             }
 
             if (keyAliases.isEmpty())
                 throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
-            else if (keyAliases.size() > 1)
+            if (keyAliases.size() > 1)
                 throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
-            else if (hasCounters && properties.getDefaultTimeToLive() > 0)
+            if (stmt.hasCounters && properties.getDefaultTimeToLive() > 0)
                 throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters");
 
             List<ColumnIdentifier> kAliases = keyAliases.get(0);
-
-            List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
+            stmt.keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
             for (ColumnIdentifier alias : kAliases)
             {
-                stmt.keyAliases.add(alias.bytes);
+                stmt.keyAliases.add(alias);
                 AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
                 if (t instanceof CounterColumnType)
                     throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
                 if (staticColumns.contains(alias))
                     throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
-                keyTypes.add(t);
+                stmt.keyTypes.add(t);
             }
-            stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
-
-            // Dense means that no part of the comparator stores a CQL column name. This means
-            // COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
-            stmt.isDense = useCompactStorage && !columnAliases.isEmpty();
 
+            stmt.clusteringTypes = new ArrayList<>(columnAliases.size());
             // Handle column aliases
-            if (columnAliases.isEmpty())
+            for (ColumnIdentifier t : columnAliases)
             {
-                if (useCompactStorage)
-                {
-                    // There should remain some column definition since it is a non-composite "static" CF
-                    if (stmt.columns.isEmpty())
-                        throw new InvalidRequestException("No definition found that is not part of the PRIMARY KEY");
-
-                    if (definedMultiCellCollections != null)
-                        throw new InvalidRequestException("Non-frozen collection types are not supported with COMPACT STORAGE");
-
-                    stmt.comparator = new SimpleSparseCellNameType(UTF8Type.instance);
-                }
-                else
-                {
-                    stmt.comparator = definedMultiCellCollections == null
-                                    ? new CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
-                                    : new CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(), ColumnToCollectionType.getInstance(definedMultiCellCollections));
-                }
+                stmt.columnAliases.add(t);
+
+                AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
+                if (type instanceof CounterColumnType)
+                    throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t));
+                if (staticColumns.contains(t))
+                    throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
+                stmt.clusteringTypes.add(type);
             }
-            else
-            {
-                // If we use compact storage and have only one alias, it is a
-                // standard "dynamic" CF, otherwise it's a composite
-                if (useCompactStorage && columnAliases.size() == 1)
-                {
-                    if (definedMultiCellCollections != null)
-                        throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
-
-                    ColumnIdentifier alias = columnAliases.get(0);
-                    if (staticColumns.contains(alias))
-                        throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
-
-                    stmt.columnAliases.add(alias.bytes);
-                    AbstractType<?> at = getTypeAndRemove(stmt.columns, alias);
-                    if (at instanceof CounterColumnType)
-                        throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
-                    stmt.comparator = new SimpleDenseCellNameType(at);
-                }
-                else
-                {
-                    List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(columnAliases.size() + 1);
-                    for (ColumnIdentifier t : columnAliases)
-                    {
-                        stmt.columnAliases.add(t.bytes);
-
-                        AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
-                        if (type instanceof CounterColumnType)
-                            throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t));
-                        if (staticColumns.contains(t))
-                            throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
-                        types.add(type);
-                    }
 
-                    if (useCompactStorage)
-                    {
-                        if (definedMultiCellCollections != null)
-                            throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
-
-                        stmt.comparator = new CompoundDenseCellNameType(types);
-                    }
-                    else
-                    {
-                        stmt.comparator = definedMultiCellCollections == null
-                                        ? new CompoundSparseCellNameType(types)
-                                        : new CompoundSparseCellNameType.WithCollection(types, ColumnToCollectionType.getInstance(definedMultiCellCollections));
-                    }
-                }
+            // We've handled anything that is not a rpimary key so stmt.columns only contains NON-PK columns. So
+            // if it's a counter table, make sure we don't have non-counter types
+            if (stmt.hasCounters)
+            {
+                for (AbstractType<?> type : stmt.columns.values())
+                    if (!type.isCounter())
+                        throw new InvalidRequestException("Cannot mix counter and non counter columns in the same table");
             }
 
-            if (!staticColumns.isEmpty())
+            // Dense means that on the thrift side, no part of the "thrift column name" stores a "CQL/metadata column name".
+            // This means COMPACT STORAGE with at least one clustering type (otherwise it's a thrift "static" CF).
+            stmt.isDense = useCompactStorage && !stmt.clusteringTypes.isEmpty();
+            // Compound means that on the thrift side, the "thrift column name" is a composite one. It's the case unless
+            // we use compact storage COMPACT STORAGE and we have either no clustering columns (thrift "static" CF) or
+            // only one of them (if more than one, it's a "dense composite").
+            stmt.isCompound = !(useCompactStorage && stmt.clusteringTypes.size() <= 1);
+
+            // For COMPACT STORAGE, we reject any "feature" that we wouldn't be able to translate back to thrift.
+            if (useCompactStorage)
             {
-                // Only CQL3 tables can have static columns
-                if (useCompactStorage)
+                if (!stmt.collections.isEmpty())
+                    throw new InvalidRequestException("Non-frozen collection types are not supported with COMPACT STORAGE");
+                if (!staticColumns.isEmpty())
                     throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
-                // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
-                if (columnAliases.isEmpty())
-                    throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
-            }
 
-            if (useCompactStorage && !stmt.columnAliases.isEmpty())
-            {
-                if (stmt.columns.isEmpty())
+                if (stmt.clusteringTypes.isEmpty())
                 {
-                    // The only value we'll insert will be the empty one, so the default validator don't matter
-                    stmt.defaultValidator = BytesType.instance;
-                    // We need to distinguish between
-                    //   * I'm upgrading from thrift so the valueAlias is null
-                    //   * I've defined my table with only a PK (and the column value will be empty)
-                    // So, we use an empty valueAlias (rather than null) for the second case
-                    stmt.valueAlias = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                    // It's a thrift "static CF" so there should be some columns definition
+                    if (stmt.columns.isEmpty())
+                        throw new InvalidRequestException("No definition found that is not part of the PRIMARY KEY");
                 }
-                else
+
+                if (stmt.isDense)
                 {
+                    // We can have no columns (only the PK), but we can't have more than one.
                     if (stmt.columns.size() > 1)
                         throw new InvalidRequestException(String.format("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: %s)", StringUtils.join(stmt.columns.keySet(), ", ")));
-
-                    Map.Entry<ColumnIdentifier, AbstractType> lastEntry = stmt.columns.entrySet().iterator().next();
-                    stmt.defaultValidator = lastEntry.getValue();
-                    stmt.valueAlias = lastEntry.getKey().bytes;
-                    stmt.columns.remove(lastEntry.getKey());
+                }
+                else
+                {
+                    // we are in the "static" case, so we need at least one column defined. For non-compact however, having
+                    // just the PK is fine.
+                    if (stmt.columns.isEmpty())
+                        throw new InvalidRequestException("COMPACT STORAGE with non-composite PRIMARY KEY require one column not part of the PRIMARY KEY, none given");
                 }
             }
             else
             {
-                // For compact, we are in the "static" case, so we need at least one column defined. For non-compact however, having
-                // just the PK is fine since we have CQL3 row marker.
-                if (useCompactStorage && stmt.columns.isEmpty())
-                    throw new InvalidRequestException("COMPACT STORAGE with non-composite PRIMARY KEY require one column not part of the PRIMARY KEY, none given");
-
-                // There is no way to insert/access a column that is not defined for non-compact storage, so
-                // the actual validator don't matter much (except that we want to recognize counter CF as limitation apply to them).
-                stmt.defaultValidator = !stmt.columns.isEmpty() && (stmt.columns.values().iterator().next() instanceof CounterColumnType)
-                    ? CounterColumnType.instance
-                    : BytesType.instance;
+                if (stmt.clusteringTypes.isEmpty() && !staticColumns.isEmpty())
+                {
+                    // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
+                    if (columnAliases.isEmpty())
+                        throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+                }
             }
 
-
             // If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
             if (!definedOrdering.isEmpty())
             {