You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/06/18 18:51:27 UTC

[11/12] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f8516da
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f8516da
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f8516da

Branch: refs/heads/trunk
Commit: 1f8516da82fc19500d3b00d37780265c34a358b0
Parents: a133992 0452e74
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Thu Jun 18 17:39:43 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Jun 18 17:41:06 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cql3/ColumnIdentifier.java |   6 +-
 .../cassandra/cql3/ColumnSpecification.java     |   6 +
 .../restrictions/StatementRestrictions.java     |  22 +-
 .../selection/AbstractFunctionSelector.java     |  13 +
 .../cassandra/cql3/selection/FieldSelector.java |   8 +-
 .../cassandra/cql3/selection/Selectable.java    |  10 +-
 .../cassandra/cql3/selection/Selection.java     |  58 +--
 .../cql3/selection/SelectionColumnMapping.java  | 118 +++++++
 .../cql3/selection/SelectionColumns.java        |  18 +
 .../cassandra/cql3/selection/Selector.java      |  13 +
 .../cql3/selection/SelectorFactories.java       |   2 +-
 .../cql3/selection/SimpleSelector.java          |  17 +-
 .../cql3/selection/WritetimeOrTTLSelector.java  |  14 +-
 .../cql3/statements/SelectStatement.java        |  28 +-
 .../selection/SelectionColumnMappingTest.java   | 353 +++++++++++++++++++
 16 files changed, 633 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 76a42ab,899ea7c..fb44c9a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,7 +1,17 @@@
 -2.1.7
 +2.2
 + * Rename class for DATE type in Java driver (CASSANDRA-9563)
 + * Duplicate compilation of UDFs on coordinator (CASSANDRA-9475)
 + * Fix connection leak in CqlRecordWriter (CASSANDRA-9576)
 + * Mlockall before opening system sstables & remove boot_without_jna option (CASSANDRA-9573)
 + * Add functions to convert timeuuid to date or time, deprecate dateOf and unixTimestampOf (CASSANDRA-9229)
 + * Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566)
 + * Fix deprecated repair JMX API (CASSANDRA-9570)
 + * Add logback metrics (CASSANDRA-9378)
 +Merged from 2.1:
   * Fix memory leak in Ref due to ConcurrentLinkedQueue.remove() behaviour (CASSANDRA-9549)
 + * Make rebuild only run one at a time (CASSANDRA-9119)
  Merged from 2.0
+  * Expose some internals of SelectStatement for inspection (CASSANDRA-9532)
   * ArrivalWindow should use primitives (CASSANDRA-9496)
   * Periodically submit background compaction tasks (CASSANDRA-9592)
   * Set HAS_MORE_PAGES flag to false when PagingState is null (CASSANDRA-9571)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 467b214,1501479..823af94
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@@ -17,20 -17,16 +17,20 @@@
   */
  package org.apache.cassandra.cql3;
  
 -import java.util.Locale;
+ import java.nio.ByteBuffer;
 +import java.util.List;
 +import java.util.Locale;
- import java.nio.ByteBuffer;
  
  import org.apache.cassandra.cache.IMeasurableMemory;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.cql3.statements.Selectable;
 +import org.apache.cassandra.cql3.selection.Selectable;
 +import org.apache.cassandra.cql3.selection.Selector;
 +import org.apache.cassandra.cql3.selection.SimpleSelector;
  import org.apache.cassandra.db.marshal.AbstractType;
- import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.db.marshal.CompositeType;
  import org.apache.cassandra.db.marshal.UTF8Type;
++import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.ObjectSizes;
  import org.apache.cassandra.utils.memory.AbstractAllocator;
@@@ -109,14 -105,6 +109,14 @@@ public class ColumnIdentifier extends o
          return new ColumnIdentifier(allocator.clone(bytes), text);
      }
  
 +    public Selector.Factory newSelectorFactory(CFMetaData cfm, List<ColumnDefinition> defs) throws InvalidRequestException
 +    {
 +        ColumnDefinition def = cfm.getColumnDefinition(this);
 +        if (def == null)
 +            throw new InvalidRequestException(String.format("Undefined name %s in selection clause", this));
 +
-         return SimpleSelector.newFactory(def.name.toString(), addAndGetIndex(def, defs), def.type);
++        return SimpleSelector.newFactory(def, addAndGetIndex(def, defs));
 +    }
  
      /**
       * Because Thrift-created tables may have a non-text comparator, we cannot determine the proper 'key' until

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/ColumnSpecification.java
index bc5a914,f5f921d..e12a57e
--- a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
@@@ -17,11 -17,9 +17,12 @@@
   */
  package org.apache.cassandra.cql3;
  
+ import com.google.common.base.Objects;
 -
  import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.ReversedType;
 +
 +import java.util.Collection;
 +import java.util.Iterator;
  
  public class ColumnSpecification
  {
@@@ -38,51 -36,23 +39,56 @@@
          this.type = type;
      }
  
 -    public boolean equals(Object obj)
 +    /**
 +     * Returns a new <code>ColumnSpecification</code> for the same column but with the specified alias.
 +     *
 +     * @param alias the column alias
 +     * @return a new <code>ColumnSpecification</code> for the same column but with the specified alias.
 +     */
 +    public ColumnSpecification withAlias(ColumnIdentifier alias)
 +    {
 +        return new ColumnSpecification(ksName, cfName, alias, type);
 +    }
 +
 +    public boolean isReversedType()
      {
 -        if (null == obj)
 +        return type instanceof ReversedType;
 +    }
 +
 +    /**
 +     * Returns true if all ColumnSpecifications are in the same table, false otherwise.
 +     */
 +    public static boolean allInSameTable(Collection<ColumnSpecification> names)
 +    {
 +        if (names == null || names.isEmpty())
              return false;
  
 -        if(!(obj instanceof ColumnSpecification))
 +        Iterator<ColumnSpecification> iter = names.iterator();
 +        ColumnSpecification first = iter.next();
 +        while (iter.hasNext())
 +        {
 +            ColumnSpecification name = iter.next();
 +            if (!name.ksName.equals(first.ksName) || !name.cfName.equals(first.cfName))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean equals(Object other)
 +    {
 +        if (!(other instanceof ColumnSpecification))
              return false;
  
 -        ColumnSpecification other = (ColumnSpecification)obj;
 -        return Objects.equal(ksName, other.ksName)
 -            && Objects.equal(cfName, other.cfName)
 -            && Objects.equal(name, other.name)
 -            && Objects.equal(type, other.type);
 +        ColumnSpecification that = (ColumnSpecification) other;
 +        return this.ksName.equals(that.ksName) &&
 +               this.cfName.equals(that.cfName) &&
 +               this.name.equals(that.name) &&
 +               this.type.equals(that.type);
      }
+ 
+     public int hashCode()
+     {
+         return Objects.hashCode(ksName, cfName, name, type);
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index f848e2e,0000000..c10a56a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@@ -1,572 -1,0 +1,572 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * The restrictions corresponding to the relations specified on the where-clause of CQL query.
 + */
 +public final class StatementRestrictions
 +{
 +    /**
 +     * The Column Family meta data
 +     */
 +    public final CFMetaData cfm;
 +
 +    /**
 +     * Restrictions on partitioning columns
 +     */
 +    private PrimaryKeyRestrictions partitionKeyRestrictions;
 +
 +    /**
 +     * Restrictions on clustering columns
 +     */
 +    private PrimaryKeyRestrictions clusteringColumnsRestrictions;
 +
 +    /**
 +     * Restriction on non-primary key columns (i.e. secondary index restrictions)
 +     */
 +    private RestrictionSet nonPrimaryKeyRestrictions;
 +
 +    /**
 +     * The restrictions used to build the index expressions
 +     */
 +    private final List<Restrictions> indexRestrictions = new ArrayList<>();
 +
 +    /**
 +     * <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise
 +     */
 +    private boolean usesSecondaryIndexing;
 +
 +    /**
 +     * Specify if the query will return a range of partition keys.
 +     */
 +    private boolean isKeyRange;
 +
 +    /**
 +     * Creates a new empty <code>StatementRestrictions</code>.
 +     *
 +     * @param cfm the column family meta data
 +     * @return a new empty <code>StatementRestrictions</code>.
 +     */
 +    public static StatementRestrictions empty(CFMetaData cfm)
 +    {
 +        return new StatementRestrictions(cfm);
 +    }
 +
 +    private StatementRestrictions(CFMetaData cfm)
 +    {
 +        this.cfm = cfm;
 +        this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
 +        this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator);
 +        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +    }
 +
 +    public StatementRestrictions(CFMetaData cfm,
 +            List<Relation> whereClause,
 +            VariableSpecifications boundNames,
 +            boolean selectsOnlyStaticColumns,
 +            boolean selectACollection) throws InvalidRequestException
 +    {
 +        this.cfm = cfm;
 +        this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
 +        this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator);
 +        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +
 +        /*
 +         * WHERE clause. For a given entity, rules are: - EQ relation conflicts with anything else (including a 2nd EQ)
 +         * - Can't have more than one LT(E) relation (resp. GT(E) relation) - IN relation are restricted to row keys
 +         * (for now) and conflicts with anything else (we could allow two IN for the same entity but that doesn't seem
 +         * very useful) - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value
 +         * in CQL so far)
 +         */
 +        for (Relation relation : whereClause)
 +            addRestriction(relation.toRestriction(cfm, boundNames));
 +
 +        ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
 +        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
 +
 +        boolean hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
 +        boolean hasQueriableIndex = hasQueriableClusteringColumnIndex
 +                || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
 +                || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
 +
 +        // At this point, the select statement if fully constructed, but we still have a few things to validate
 +        processPartitionKeyRestrictions(hasQueriableIndex);
 +
 +        // Some but not all of the partition key columns have been specified;
 +        // hence we need turn these restrictions into index expressions.
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(partitionKeyRestrictions);
 +
 +        checkFalse(selectsOnlyStaticColumns && hasClusteringColumnsRestriction(),
 +                   "Cannot restrict clustering columns when selecting only static columns");
 +
 +        processClusteringColumnsRestrictions(hasQueriableIndex, selectACollection);
 +
 +        // Covers indexes on the first clustering column (among others).
 +        if (isKeyRange && hasQueriableClusteringColumnIndex)
 +            usesSecondaryIndexing = true;
 +
 +        usesSecondaryIndexing = usesSecondaryIndexing || clusteringColumnsRestrictions.isContains();
 +
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(clusteringColumnsRestrictions);
 +
 +        // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
 +        // there is restrictions not covered by the PK.
 +        if (!nonPrimaryKeyRestrictions.isEmpty())
 +        {
 +            usesSecondaryIndexing = true;
 +            indexRestrictions.add(nonPrimaryKeyRestrictions);
 +        }
 +
 +        if (usesSecondaryIndexing)
 +            validateSecondaryIndexSelections(selectsOnlyStaticColumns);
 +    }
 +
 +    private void addRestriction(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
 +        else if (restriction.isOnToken())
 +            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
 +        else
 +            addSingleColumnRestriction((SingleColumnRestriction) restriction);
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        return Iterables.concat(partitionKeyRestrictions.getFunctions(),
 +                                clusteringColumnsRestrictions.getFunctions(),
 +                                nonPrimaryKeyRestrictions.getFunctions());
 +    }
 +
 +    private void addSingleColumnRestriction(SingleColumnRestriction restriction) throws InvalidRequestException
 +    {
 +        ColumnDefinition def = restriction.columnDef;
 +        if (def.isPartitionKey())
 +            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
 +        else if (def.isClusteringColumn())
 +            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
 +        else
 +            nonPrimaryKeyRestrictions = nonPrimaryKeyRestrictions.addRestriction(restriction);
 +    }
 +
 +    /**
 +     * Checks if the restrictions on the partition key is an IN restriction.
 +     *
 +     * @return <code>true</code> the restrictions on the partition key is an IN restriction, <code>false</code>
 +     * otherwise.
 +     */
 +    public boolean keyIsInRelation()
 +    {
 +        return partitionKeyRestrictions.isIN();
 +    }
 +
 +    /**
 +     * Checks if the query request a range of partition keys.
 +     *
 +     * @return <code>true</code> if the query request a range of partition keys, <code>false</code> otherwise.
 +     */
 +    public boolean isKeyRange()
 +    {
 +        return this.isKeyRange;
 +    }
 +
 +    /**
 +     * Checks if the secondary index need to be queried.
 +     *
 +     * @return <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise.
 +     */
 +    public boolean usesSecondaryIndexing()
 +    {
 +        return this.usesSecondaryIndexing;
 +    }
 +
 +    private void processPartitionKeyRestrictions(boolean hasQueriableIndex) throws InvalidRequestException
 +    {
 +        // If there is a queriable index, no special condition are required on the other restrictions.
 +        // But we still need to know 2 things:
 +        // - If we don't have a queriable index, is the query ok
 +        // - Is it queriable without 2ndary index, which is always more efficient
 +        // If a component of the partition key is restricted by a relation, all preceding
 +        // components must have a EQ. Only the last partition key component can be in IN relation.
 +        if (partitionKeyRestrictions.isOnToken())
 +            isKeyRange = true;
 +
 +        if (hasPartitionKeyUnrestrictedComponents())
 +        {
 +            if (!partitionKeyRestrictions.isEmpty())
 +            {
 +                if (!hasQueriableIndex)
 +                    throw invalidRequest("Partition key parts: %s must be restricted as other parts are",
 +                                         Joiner.on(", ").join(getPartitionKeyUnrestrictedComponents()));
 +            }
 +
 +            isKeyRange = true;
 +            usesSecondaryIndexing = hasQueriableIndex;
 +        }
 +    }
 +
 +    /**
 +     * Checks if the partition key has some unrestricted components.
 +     * @return <code>true</code> if the partition key has some unrestricted components, <code>false</code> otherwise.
 +     */
 +    private boolean hasPartitionKeyUnrestrictedComponents()
 +    {
 +        return partitionKeyRestrictions.size() <  cfm.partitionKeyColumns().size();
 +    }
 +
 +    /**
 +     * Returns the partition key components that are not restricted.
 +     * @return the partition key components that are not restricted.
 +     */
 +    private List<ColumnIdentifier> getPartitionKeyUnrestrictedComponents()
 +    {
 +        List<ColumnDefinition> list = new ArrayList<>(cfm.partitionKeyColumns());
 +        list.removeAll(partitionKeyRestrictions.getColumnDefs());
 +        return ColumnDefinition.toIdentifiers(list);
 +    }
 +
 +    /**
 +     * Processes the clustering column restrictions.
 +     *
 +     * @param hasQueriableIndex <code>true</code> if some of the queried data are indexed, <code>false</code> otherwise
 +     * @param selectACollection <code>true</code> if the query should return a collection column
 +     * @throws InvalidRequestException if the request is invalid
 +     */
 +    private void processClusteringColumnsRestrictions(boolean hasQueriableIndex,
 +                                                      boolean selectACollection) throws InvalidRequestException
 +    {
 +        checkFalse(clusteringColumnsRestrictions.isIN() && selectACollection,
 +                   "Cannot restrict clustering columns by IN relations when a collection is selected by the query");
 +        checkFalse(clusteringColumnsRestrictions.isContains() && !hasQueriableIndex,
 +                   "Cannot restrict clustering columns by a CONTAINS relation without a secondary index");
 +
 +        if (hasClusteringColumnsRestriction())
 +        {
 +            List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
 +            List<ColumnDefinition> restrictedColumns = new LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
 +
 +            for (int i = 0, m = restrictedColumns.size(); i < m; i++)
 +            {
 +                ColumnDefinition clusteringColumn = clusteringColumns.get(i);
 +                ColumnDefinition restrictedColumn = restrictedColumns.get(i);
 +
 +                if (!clusteringColumn.equals(restrictedColumn))
 +                {
 +                    checkTrue(hasQueriableIndex,
 +                              "PRIMARY KEY column \"%s\" cannot be restricted as preceding column \"%s\" is not restricted",
 +                              restrictedColumn.name,
 +                              clusteringColumn.name);
 +
 +                    usesSecondaryIndexing = true; // handle gaps and non-keyrange cases.
 +                    break;
 +                }
 +            }
 +        }
 +
 +        if (clusteringColumnsRestrictions.isContains())
 +            usesSecondaryIndexing = true;
 +    }
 +
 +    public List<IndexExpression> getIndexExpressions(SecondaryIndexManager indexManager,
 +                                                     QueryOptions options) throws InvalidRequestException
 +    {
 +        if (!usesSecondaryIndexing || indexRestrictions.isEmpty())
 +            return Collections.emptyList();
 +
 +        List<IndexExpression> expressions = new ArrayList<>();
 +        for (Restrictions restrictions : indexRestrictions)
 +            restrictions.addIndexExpressionTo(expressions, indexManager, options);
 +
 +        return expressions;
 +    }
 +
 +    /**
 +     * Returns the partition keys for which the data is requested.
 +     *
 +     * @param options the query options
 +     * @return the partition keys for which the data is requested.
 +     * @throws InvalidRequestException if the partition keys cannot be retrieved
 +     */
 +    public Collection<ByteBuffer> getPartitionKeys(final QueryOptions options) throws InvalidRequestException
 +    {
 +        return partitionKeyRestrictions.values(options);
 +    }
 +
 +    /**
 +     * Returns the specified bound of the partition key.
 +     *
 +     * @param b the boundary type
 +     * @param options the query options
 +     * @return the specified bound of the partition key
 +     * @throws InvalidRequestException if the boundary cannot be retrieved
 +     */
 +    private ByteBuffer getPartitionKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the
 +        // first
 +        // component of a composite partition key).
 +        if (hasPartitionKeyUnrestrictedComponents())
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        // We deal with IN queries for keys in other places, so we know buildBound will return only one result
 +        return partitionKeyRestrictions.bounds(b, options).get(0);
 +    }
 +
 +    /**
 +     * Returns the partition key bounds.
 +     *
 +     * @param options the query options
 +     * @return the partition key bounds
 +     * @throws InvalidRequestException if the query is invalid
 +     */
 +    public AbstractBounds<RowPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException
 +    {
 +        IPartitioner p = StorageService.getPartitioner();
 +
 +        if (partitionKeyRestrictions.isOnToken())
 +        {
 +            return getPartitionKeyBoundsForTokenRestrictions(p, options);
 +        }
 +
 +        return getPartitionKeyBounds(p, options);
 +    }
 +
 +    private AbstractBounds<RowPosition> getPartitionKeyBounds(IPartitioner p,
 +                                                              QueryOptions options) throws InvalidRequestException
 +    {
 +        ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options);
 +        ByteBuffer finishKeyBytes = getPartitionKeyBound(Bound.END, options);
 +
 +        RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
 +        RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
 +
 +        if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum())
 +            return null;
 +
 +        if (partitionKeyRestrictions.isInclusive(Bound.START))
 +        {
 +            return partitionKeyRestrictions.isInclusive(Bound.END)
 +                    ? new Bounds<>(startKey, finishKey)
 +                    : new IncludingExcludingBounds<>(startKey, finishKey);
 +        }
 +
 +        return partitionKeyRestrictions.isInclusive(Bound.END)
 +                ? new Range<>(startKey, finishKey)
 +                : new ExcludingBounds<>(startKey, finishKey);
 +    }
 +
 +    private AbstractBounds<RowPosition> getPartitionKeyBoundsForTokenRestrictions(IPartitioner p,
 +                                                                                  QueryOptions options)
 +                                                                                          throws InvalidRequestException
 +    {
 +        Token startToken = getTokenBound(Bound.START, options, p);
 +        Token endToken = getTokenBound(Bound.END, options, p);
 +
 +        boolean includeStart = partitionKeyRestrictions.isInclusive(Bound.START);
 +        boolean includeEnd = partitionKeyRestrictions.isInclusive(Bound.END);
 +
 +        /*
 +         * If we ask SP.getRangeSlice() for (token(200), token(200)], it will happily return the whole ring.
 +         * However, wrapping range doesn't really make sense for CQL, and we want to return an empty result in that
 +         * case (CASSANDRA-5573). So special case to create a range that is guaranteed to be empty.
 +         *
 +         * In practice, we want to return an empty result set if either startToken > endToken, or both are equal but
 +         * one of the bound is excluded (since [a, a] can contains something, but not (a, a], [a, a) or (a, a)).
 +         * Note though that in the case where startToken or endToken is the minimum token, then this special case
 +         * rule should not apply.
 +         */
 +        int cmp = startToken.compareTo(endToken);
 +        if (!startToken.isMinimum() && !endToken.isMinimum()
 +                && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd))))
 +            return null;
 +
 +        RowPosition start = includeStart ? startToken.minKeyBound() : startToken.maxKeyBound();
 +        RowPosition end = includeEnd ? endToken.maxKeyBound() : endToken.minKeyBound();
 +
 +        return new Range<>(start, end);
 +    }
 +
 +    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException
 +    {
 +        if (!partitionKeyRestrictions.hasBound(b))
 +            return p.getMinimumToken();
 +
 +        ByteBuffer value = partitionKeyRestrictions.bounds(b, options).get(0);
 +        checkNotNull(value, "Invalid null token value");
 +        return p.getTokenFactory().fromByteArray(value);
 +    }
 +
 +    /**
 +     * Checks if the query does not contains any restriction on the clustering columns.
 +     *
 +     * @return <code>true</code> if the query does not contains any restriction on the clustering columns,
 +     * <code>false</code> otherwise.
 +     */
 +    public boolean hasNoClusteringColumnsRestriction()
 +    {
 +        return clusteringColumnsRestrictions.isEmpty();
 +    }
 +
++    /**
++     * Checks if the query has some restrictions on the clustering columns.
++     *
++     * @return <code>true</code> if the query has some restrictions on the clustering columns,
++     * <code>false</code> otherwise.
++     */
++    public boolean hasClusteringColumnsRestriction()
++    {
++        return !clusteringColumnsRestrictions.isEmpty();
++    }
++
 +    // For non-composite slices, we don't support internally the difference between exclusive and
 +    // inclusive bounds, so we deal with it manually.
 +    public boolean isNonCompositeSliceWithExclusiveBounds()
 +    {
 +        return !cfm.comparator.isCompound()
 +                && clusteringColumnsRestrictions.isSlice()
 +                && (!clusteringColumnsRestrictions.isInclusive(Bound.START) || !clusteringColumnsRestrictions.isInclusive(Bound.END));
 +    }
 +
 +    /**
 +     * Returns the requested clustering columns as <code>Composite</code>s.
 +     *
 +     * @param options the query options
 +     * @return the requested clustering columns as <code>Composite</code>s
 +     * @throws InvalidRequestException if the query is not valid
 +     */
 +    public List<Composite> getClusteringColumnsAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.valuesAsComposites(options);
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns as <code>Composites</code>.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns as <code>Composites</code>
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<Composite> getClusteringColumnsBoundsAsComposites(Bound b,
 +                                                                  QueryOptions options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.boundsAsComposites(b, options);
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<ByteBuffer> getClusteringColumnsBounds(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        return clusteringColumnsRestrictions.bounds(b, options);
 +    }
 +
 +    /**
 +     * Checks if the bounds (start or end) of the clustering columns are inclusive.
 +     *
 +     * @param bound the bound type
 +     * @return <code>true</code> if the bounds (start or end) of the clustering columns are inclusive,
 +     * <code>false</code> otherwise
 +     */
 +    public boolean areRequestedBoundsInclusive(Bound bound)
 +    {
 +        return clusteringColumnsRestrictions.isInclusive(bound);
 +    }
 +
 +    /**
 +     * Checks if the query returns a range of columns.
 +     *
 +     * @return <code>true</code> if the query returns a range of columns, <code>false</code> otherwise.
 +     */
 +    public boolean isColumnRange()
 +    {
 +        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
 +        // Static CF (non dense but non composite) never entails a column slice however
 +        if (!cfm.comparator.isDense())
 +            return cfm.comparator.isCompound();
 +
 +        // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about
 +        // CASSANDRA-5762),
 +        // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
 +        return clusteringColumnsRestrictions.size() < cfm.clusteringColumns().size() || clusteringColumnsRestrictions.isSlice();
 +    }
 +
 +    /**
 +     * Checks if the query need to use filtering.
 +     * @return <code>true</code> if the query need to use filtering, <code>false</code> otherwise.
 +     */
 +    public boolean needFiltering()
 +    {
 +        int numberOfRestrictedColumns = 0;
 +        for (Restrictions restrictions : indexRestrictions)
 +            numberOfRestrictedColumns += restrictions.size();
 +
 +        return numberOfRestrictedColumns > 1
 +                || (numberOfRestrictedColumns == 0 && !clusteringColumnsRestrictions.isEmpty())
 +                || (numberOfRestrictedColumns != 0
 +                        && nonPrimaryKeyRestrictions.hasMultipleContains());
 +    }
 +
 +    private void validateSecondaryIndexSelections(boolean selectsOnlyStaticColumns) throws InvalidRequestException
 +    {
 +        checkFalse(keyIsInRelation(),
 +                   "Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
 +        // When the user only select static columns, the intent is that we don't query the whole partition but just
 +        // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on
 +        // static columns
 +        // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
 +        checkFalse(selectsOnlyStaticColumns, "Queries using 2ndary indexes don't support selecting only static columns");
 +    }
 +
-     /**
-      * Checks if the query has some restrictions on the clustering columns.
-      *
-      * @return <code>true</code> if the query has some restrictions on the clustering columns,
-      * <code>false</code> otherwise.
-      */
-     private boolean hasClusteringColumnsRestriction()
-     {
-         return !clusteringColumnsRestrictions.isEmpty();
-     }
- 
 +    public void reverse()
 +    {
 +        clusteringColumnsRestrictions = new ReversedPrimaryKeyRestrictions(clusteringColumnsRestrictions);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index 1dd1903,0000000..fa40152
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@@ -1,120 -1,0 +1,133 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import com.google.common.collect.Iterables;
 +import org.apache.commons.lang3.text.StrBuilder;
 +
++import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +abstract class AbstractFunctionSelector<T extends Function> extends Selector
 +{
 +    protected final T fun;
 +
 +    /**
 +     * The list used to pass the function arguments is recycled to avoid the cost of instantiating a new list
 +     * with each function call.
 +     */
 +    protected final List<ByteBuffer> args;
 +    protected final List<Selector> argSelectors;
 +
 +    public static Factory newFactory(final Function fun, final SelectorFactories factories) throws InvalidRequestException
 +    {
 +        if (fun.isAggregate())
 +        {
 +            if (factories.doesAggregation())
 +                throw new InvalidRequestException("aggregate functions cannot be used as arguments of aggregate functions");
 +        }
 +        else
 +        {
 +            if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions())
 +                throw new InvalidRequestException(String.format("arguments of function %s must be either all aggregates or no aggregates",
 +                                                                fun.name()));
 +        }
 +
 +        return new Factory()
 +        {
 +            protected String getColumnName()
 +            {
 +                return new StrBuilder(fun.name().toString()).append('(')
 +                                                            .appendWithSeparators(factories.getColumnNames(), ", ")
 +                                                            .append(')')
 +                                                            .toString();
 +            }
 +
 +            protected AbstractType<?> getReturnType()
 +            {
 +                return fun.returnType();
 +            }
 +
++            protected void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultsColumn)
++            {
++                for (Factory factory : factories)
++                   factory.addColumnMapping(mapping, resultsColumn);
++
++                if (mapping.getMappings().get(resultsColumn).isEmpty())
++                    // add a null mapping for cases where there are no
++                    // further selectors, such as no-arg functions and count
++                    mapping.addMapping(resultsColumn, null);
++
++            }
++
 +            public Iterable<Function> getFunctions()
 +            {
 +                return Iterables.concat(fun.getFunctions(), factories.getFunctions());
 +            }
 +
 +            public Selector newInstance() throws InvalidRequestException
 +            {
 +                return fun.isAggregate() ? new AggregateFunctionSelector(fun, factories.newInstances())
 +                                         : new ScalarFunctionSelector(fun, factories.newInstances());
 +            }
 +
 +            public boolean isWritetimeSelectorFactory()
 +            {
 +                return factories.containsWritetimeSelectorFactory();
 +            }
 +
 +            public boolean isTTLSelectorFactory()
 +            {
 +                return factories.containsTTLSelectorFactory();
 +            }
 +
 +            public boolean isAggregateSelectorFactory()
 +            {
 +                return fun.isAggregate() || factories.containsOnlyAggregateFunctions();
 +            }
 +        };
 +    }
 +
 +    protected AbstractFunctionSelector(T fun, List<Selector> argSelectors)
 +    {
 +        this.fun = fun;
 +        this.argSelectors = argSelectors;
 +        this.args = Arrays.asList(new ByteBuffer[argSelectors.size()]);
 +    }
 +
 +    public AbstractType<?> getType()
 +    {
 +        return fun.returnType();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return new StrBuilder().append(fun.name())
 +                               .append("(")
 +                               .appendWithSeparators(argSelectors, ", ")
 +                               .append(")")
 +                               .toString();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index 76dbb22,0000000..63b6cc6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@@ -1,103 -1,0 +1,109 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +
++import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +final class FieldSelector extends Selector
 +{
 +    private final UserType type;
 +    private final int field;
 +    private final Selector selected;
 +
 +    public static Factory newFactory(final UserType type, final int field, final Selector.Factory factory)
 +    {
 +        return new Factory()
 +        {
 +            protected String getColumnName()
 +            {
 +                return String.format("%s.%s",
 +                                     factory.getColumnName(),
 +                                     UTF8Type.instance.getString(type.fieldName(field)));
 +            }
 +
 +            protected AbstractType<?> getReturnType()
 +            {
 +                return type.fieldType(field);
 +            }
 +
++            protected void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultsColumn)
++            {
++                factory.addColumnMapping(mapping, resultsColumn);
++            }
++
 +            public Selector newInstance() throws InvalidRequestException
 +            {
 +                return new FieldSelector(type, field, factory.newInstance());
 +            }
 +
 +            public boolean isAggregateSelectorFactory()
 +            {
 +                return factory.isAggregateSelectorFactory();
 +            }
 +        };
 +    }
 +
 +    public boolean isAggregate()
 +    {
 +        return false;
 +    }
 +
 +    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
 +    {
 +        selected.addInput(protocolVersion, rs);
 +    }
 +
 +    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
 +    {
 +        ByteBuffer value = selected.getOutput(protocolVersion);
 +        if (value == null)
 +            return null;
 +        ByteBuffer[] buffers = type.split(value);
 +        return field < buffers.length ? buffers[field] : null;
 +    }
 +
 +    public AbstractType<?> getType()
 +    {
 +        return type.fieldType(field);
 +    }
 +
 +    public void reset()
 +    {
 +        selected.reset();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("%s.%s", selected, UTF8Type.instance.getString(type.fieldName(field)));
 +    }
 +
 +    private FieldSelector(UserType type, int field, Selector selected)
 +    {
 +        this.type = type;
 +        this.field = field;
 +        this.selected = selected;
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/Selectable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selectable.java
index 4506111,0000000..ee134ee
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
@@@ -1,253 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
++import org.apache.commons.lang3.text.StrBuilder;
++
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
- import org.apache.cassandra.cql3.functions.Function;
- import org.apache.cassandra.cql3.functions.FunctionName;
- import org.apache.cassandra.cql3.functions.Functions;
- import org.apache.cassandra.cql3.functions.ToJsonFct;
++import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
- import org.apache.commons.lang3.text.StrBuilder;
 +
 +public abstract class Selectable
 +{
 +    public abstract Selector.Factory newSelectorFactory(CFMetaData cfm, List<ColumnDefinition> defs)
 +            throws InvalidRequestException;
 +
 +    protected static int addAndGetIndex(ColumnDefinition def, List<ColumnDefinition> l)
 +    {
 +        int idx = l.indexOf(def);
 +        if (idx < 0)
 +        {
 +            idx = l.size();
 +            l.add(def);
 +        }
 +        return idx;
 +    }
 +
 +    public static interface Raw
 +    {
 +        public Selectable prepare(CFMetaData cfm);
 +
 +        /**
 +         * Returns true if any processing is performed on the selected column.
 +         **/
 +        public boolean processesSelection();
 +    }
 +
 +    public static class WritetimeOrTTL extends Selectable
 +    {
 +        public final ColumnIdentifier id;
 +        public final boolean isWritetime;
 +
 +        public WritetimeOrTTL(ColumnIdentifier id, boolean isWritetime)
 +        {
 +            this.id = id;
 +            this.isWritetime = isWritetime;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return (isWritetime ? "writetime" : "ttl") + "(" + id + ")";
 +        }
 +
 +        public Selector.Factory newSelectorFactory(CFMetaData cfm,
 +                                                   List<ColumnDefinition> defs) throws InvalidRequestException
 +        {
 +            ColumnDefinition def = cfm.getColumnDefinition(id);
 +            if (def == null)
 +                throw new InvalidRequestException(String.format("Undefined name %s in selection clause", id));
 +            if (def.isPrimaryKeyColumn())
 +                throw new InvalidRequestException(
 +                        String.format("Cannot use selection function %s on PRIMARY KEY part %s",
 +                                      isWritetime ? "writeTime" : "ttl",
 +                                      def.name));
 +            if (def.type.isCollection())
 +                throw new InvalidRequestException(String.format("Cannot use selection function %s on collections",
 +                                                                isWritetime ? "writeTime" : "ttl"));
 +
-             return WritetimeOrTTLSelector.newFactory(def.name.toString(), addAndGetIndex(def, defs), isWritetime);
++            return WritetimeOrTTLSelector.newFactory(def, addAndGetIndex(def, defs), isWritetime);
 +        }
 +
 +        public static class Raw implements Selectable.Raw
 +        {
 +            private final ColumnIdentifier.Raw id;
 +            private final boolean isWritetime;
 +
 +            public Raw(ColumnIdentifier.Raw id, boolean isWritetime)
 +            {
 +                this.id = id;
 +                this.isWritetime = isWritetime;
 +            }
 +
 +            public WritetimeOrTTL prepare(CFMetaData cfm)
 +            {
 +                return new WritetimeOrTTL(id.prepare(cfm), isWritetime);
 +            }
 +
 +            public boolean processesSelection()
 +            {
 +                return true;
 +            }
 +        }
 +    }
 +
 +    public static class WithFunction extends Selectable
 +    {
 +        public final FunctionName functionName;
 +        public final List<Selectable> args;
 +
 +        public WithFunction(FunctionName functionName, List<Selectable> args)
 +        {
 +            this.functionName = functionName;
 +            this.args = args;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return new StrBuilder().append(functionName)
 +                                   .append("(")
 +                                   .appendWithSeparators(args, ", ")
 +                                   .append(")")
 +                                   .toString();
 +        }
 +
 +        public Selector.Factory newSelectorFactory(CFMetaData cfm,
 +                                                   List<ColumnDefinition> defs) throws InvalidRequestException
 +        {
 +            SelectorFactories factories  =
 +                    SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, cfm, defs);
 +
 +            // We need to circumvent the normal function lookup process for toJson() because instances of the function
 +            // are not pre-declared (because it can accept any type of argument).
 +            Function fun;
 +            if (functionName.equalsNativeFunction(ToJsonFct.NAME))
 +                fun = ToJsonFct.getInstance(factories.getReturnTypes());
 +            else
 +                fun = Functions.get(cfm.ksName, functionName, factories.newInstances(), cfm.ksName, cfm.cfName, null);
 +
 +            if (fun == null)
 +                throw new InvalidRequestException(String.format("Unknown function '%s'", functionName));
 +            if (fun.returnType() == null)
 +                throw new InvalidRequestException(String.format("Unknown function %s called in selection clause",
 +                                                                functionName));
 +
 +            return AbstractFunctionSelector.newFactory(fun, factories);
 +        }
 +
 +        public static class Raw implements Selectable.Raw
 +        {
 +            private final FunctionName functionName;
 +            private final List<Selectable.Raw> args;
 +
 +            public Raw(FunctionName functionName, List<Selectable.Raw> args)
 +            {
 +                this.functionName = functionName;
 +                this.args = args;
 +            }
 +
 +            public WithFunction prepare(CFMetaData cfm)
 +            {
 +                List<Selectable> preparedArgs = new ArrayList<>(args.size());
 +                for (Selectable.Raw arg : args)
 +                    preparedArgs.add(arg.prepare(cfm));
 +                return new WithFunction(functionName, preparedArgs);
 +            }
 +
 +            public boolean processesSelection()
 +            {
 +                return true;
 +            }
 +        }
 +    }
 +
 +    public static class WithFieldSelection extends Selectable
 +    {
 +        public final Selectable selected;
 +        public final ColumnIdentifier field;
 +
 +        public WithFieldSelection(Selectable selected, ColumnIdentifier field)
 +        {
 +            this.selected = selected;
 +            this.field = field;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s.%s", selected, field);
 +        }
 +
 +        public Selector.Factory newSelectorFactory(CFMetaData cfm,
 +                                                   List<ColumnDefinition> defs) throws InvalidRequestException
 +        {
 +            Selector.Factory factory = selected.newSelectorFactory(cfm, defs);
 +            AbstractType<?> type = factory.newInstance().getType();
 +            if (!(type instanceof UserType))
 +                throw new InvalidRequestException(
 +                        String.format("Invalid field selection: %s of type %s is not a user type",
 +                                      selected,
 +                                      type.asCQL3Type()));
 +
 +            UserType ut = (UserType) type;
 +            for (int i = 0; i < ut.size(); i++)
 +            {
 +                if (!ut.fieldName(i).equals(field.bytes))
 +                    continue;
 +                return FieldSelector.newFactory(ut, i, factory);
 +            }
 +            throw new InvalidRequestException(String.format("%s of type %s has no field %s",
 +                                                            selected,
 +                                                            type.asCQL3Type(),
 +                                                            field));
 +        }
 +
 +        public static class Raw implements Selectable.Raw
 +        {
 +            private final Selectable.Raw selected;
 +            private final ColumnIdentifier.Raw field;
 +
 +            public Raw(Selectable.Raw selected, ColumnIdentifier.Raw field)
 +            {
 +                this.selected = selected;
 +                this.field = field;
 +            }
 +
 +            public WithFieldSelection prepare(CFMetaData cfm)
 +            {
 +                return new WithFieldSelection(selected.prepare(cfm), field.prepare(cfm));
 +            }
 +
 +            public boolean processesSelection()
 +            {
 +                return true;
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selection.java
index 9c990ce,0000000..25278df
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@@ -1,535 -1,0 +1,547 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Objects;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.db.Cell;
 +import org.apache.cassandra.db.CounterCell;
 +import org.apache.cassandra.db.ExpiringCell;
 +import org.apache.cassandra.db.context.CounterContext;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public abstract class Selection
 +{
 +    /**
 +     * A predicate that returns <code>true</code> for static columns.
 +     */
 +    private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = new Predicate<ColumnDefinition>()
 +    {
 +        public boolean apply(ColumnDefinition def)
 +        {
 +            return def.isStatic();
 +        }
 +    };
 +
 +    private final CFMetaData cfm;
-     private final Collection<ColumnDefinition> columns;
++    private final List<ColumnDefinition> columns;
++    private final SelectionColumnMapping columnMapping;
 +    private final ResultSet.ResultMetadata metadata;
 +    private final boolean collectTimestamps;
 +    private final boolean collectTTLs;
 +
 +    protected Selection(CFMetaData cfm,
-                         Collection<ColumnDefinition> columns,
-                         List<ColumnSpecification> metadata,
++                        List<ColumnDefinition> columns,
++                        SelectionColumnMapping columnMapping,
 +                        boolean collectTimestamps,
 +                        boolean collectTTLs)
 +    {
 +        this.cfm = cfm;
 +        this.columns = columns;
-         this.metadata = new ResultSet.ResultMetadata(metadata);
++        this.columnMapping = columnMapping;
++        this.metadata = new ResultSet.ResultMetadata(columnMapping.getColumnSpecifications());
 +        this.collectTimestamps = collectTimestamps;
 +        this.collectTTLs = collectTTLs;
 +    }
 +
 +    // Overriden by SimpleSelection when appropriate.
 +    public boolean isWildcard()
 +    {
 +        return false;
 +    }    
 +
 +    /**
 +     * Checks if this selection contains static columns.
 +     * @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise;
 +     */
 +    public boolean containsStaticColumns()
 +    {
 +        if (!cfm.hasStaticColumns())
 +            return false;
 +
 +        if (isWildcard())
 +            return true;
 +
 +        return !Iterables.isEmpty(Iterables.filter(columns, STATIC_COLUMN_FILTER));
 +    }
 +
 +    /**
 +     * Checks if this selection contains only static columns.
 +     * @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise;
 +     */
 +    public boolean containsOnlyStaticColumns()
 +    {
 +        if (!containsStaticColumns())
 +            return false;
 +
 +        if (isWildcard())
 +            return false;
 +
 +        for (ColumnDefinition def : getColumns())
 +        {
 +            if (!def.isPartitionKey() && !def.isStatic())
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Checks if this selection contains a collection.
 +     *
 +     * @return <code>true</code> if this selection contains a collection, <code>false</code> otherwise.
 +     */
 +    public boolean containsACollection()
 +    {
 +        if (!cfm.comparator.hasCollections())
 +            return false;
 +
 +        for (ColumnDefinition def : getColumns())
 +            if (def.type.isCollection() && def.type.isMultiCell())
 +                return true;
 +
 +        return false;
 +    }
 +
 +    /**
 +     * Returns the index of the specified column.
 +     *
 +     * @param def the column definition
 +     * @return the index of the specified column
 +     */
 +    public int indexOf(final ColumnDefinition def)
 +    {
 +        return Iterators.indexOf(getColumns().iterator(), new Predicate<ColumnDefinition>()
 +           {
 +               public boolean apply(ColumnDefinition n)
 +               {
 +                   return def.name.equals(n.name);
 +               }
 +           });
 +    }
 +
 +    public ResultSet.ResultMetadata getResultMetadata(boolean isJson)
 +    {
 +        if (!isJson)
 +            return metadata;
 +
 +        ColumnSpecification firstColumn = metadata.names.get(0);
 +        ColumnSpecification jsonSpec = new ColumnSpecification(firstColumn.ksName, firstColumn.cfName, Json.JSON_COLUMN_ID, UTF8Type.instance);
 +        return new ResultSet.ResultMetadata(Arrays.asList(jsonSpec));
 +    }
 +
 +    public static Selection wildcard(CFMetaData cfm)
 +    {
-         List<ColumnDefinition> all = new ArrayList<ColumnDefinition>(cfm.allColumns().size());
++        List<ColumnDefinition> all = new ArrayList<>(cfm.allColumns().size());
 +        Iterators.addAll(all, cfm.allColumnsInSelectOrder());
 +        return new SimpleSelection(cfm, all, true);
 +    }
 +
-     public static Selection forColumns(CFMetaData cfm, Collection<ColumnDefinition> columns)
++    public static Selection forColumns(CFMetaData cfm, List<ColumnDefinition> columns)
 +    {
 +        return new SimpleSelection(cfm, columns, false);
 +    }
 +
 +    public int addColumnForOrdering(ColumnDefinition c)
 +    {
 +        columns.add(c);
 +        metadata.addNonSerializedColumn(c);
 +        return columns.size() - 1;
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        return Collections.emptySet();
 +    }
 +
 +    private static boolean processesSelection(List<RawSelector> rawSelectors)
 +    {
 +        for (RawSelector rawSelector : rawSelectors)
 +        {
 +            if (rawSelector.processesSelection())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors) throws InvalidRequestException
 +    {
-         List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>();
++        List<ColumnDefinition> defs = new ArrayList<>();
 +
 +        SelectorFactories factories =
 +                SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), cfm, defs);
-         List<ColumnSpecification> metadata = collectMetadata(cfm, rawSelectors, factories);
++        SelectionColumnMapping mapping = collectColumnMappings(cfm, rawSelectors, factories);
 +
-         return processesSelection(rawSelectors) ? new SelectionWithProcessing(cfm, defs, metadata, factories)
-                                                 : new SimpleSelection(cfm, defs, metadata, false);
++        return processesSelection(rawSelectors) ? new SelectionWithProcessing(cfm, defs, mapping, factories)
++                                                : new SimpleSelection(cfm, defs, mapping, false);
 +    }
 +
-     private static List<ColumnSpecification> collectMetadata(CFMetaData cfm,
-                                                              List<RawSelector> rawSelectors,
-                                                              SelectorFactories factories)
++    private static SelectionColumnMapping collectColumnMappings(CFMetaData cfm,
++                                                                List<RawSelector> rawSelectors,
++                                                                SelectorFactories factories)
 +    {
-         List<ColumnSpecification> metadata = new ArrayList<ColumnSpecification>(rawSelectors.size());
++        SelectionColumnMapping selectionColumns = SelectionColumnMapping.newMapping();
 +        Iterator<RawSelector> iter = rawSelectors.iterator();
 +        for (Selector.Factory factory : factories)
 +        {
 +            ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
 +            ColumnIdentifier alias = iter.next().alias;
-             metadata.add(alias == null ? colSpec : colSpec.withAlias(alias));
++            factory.addColumnMapping(selectionColumns,
++                                     alias == null ? colSpec : colSpec.withAlias(alias));
 +        }
-         return metadata;
++        return selectionColumns;
 +    }
 +
 +    protected abstract Selectors newSelectors() throws InvalidRequestException;
 +
 +    /**
 +     * @return the list of CQL3 columns value this SelectionClause needs.
 +     */
-     public Collection<ColumnDefinition> getColumns()
++    public List<ColumnDefinition> getColumns()
 +    {
 +        return columns;
 +    }
 +
++    /**
++     * @return the mappings between resultset columns and the underlying columns
++     */
++    public SelectionColumns getColumnMapping()
++    {
++        return columnMapping;
++    }
++
 +    public ResultSetBuilder resultSetBuilder(long now, boolean isJson) throws InvalidRequestException
 +    {
 +        return new ResultSetBuilder(now, isJson);
 +    }
 +
 +    public abstract boolean isAggregate();
 +
 +    @Override
 +    public String toString()
 +    {
 +        return Objects.toStringHelper(this)
 +                .add("columns", columns)
++                .add("columnMapping", columnMapping)
 +                .add("metadata", metadata)
 +                .add("collectTimestamps", collectTimestamps)
 +                .add("collectTTLs", collectTTLs)
 +                .toString();
 +    }
 +
 +    public class ResultSetBuilder
 +    {
 +        private final ResultSet resultSet;
 +
 +        /**
 +         * As multiple thread can access a <code>Selection</code> instance each <code>ResultSetBuilder</code> will use
 +         * its own <code>Selectors</code> instance.
 +         */
 +        private final Selectors selectors;
 +
 +        /*
 +         * We'll build CQL3 row one by one.
 +         * The currentRow is the values for the (CQL3) columns we've fetched.
 +         * We also collect timestamps and ttls for the case where the writetime and
 +         * ttl functions are used. Note that we might collect timestamp and/or ttls
 +         * we don't care about, but since the array below are allocated just once,
 +         * it doesn't matter performance wise.
 +         */
 +        List<ByteBuffer> current;
 +        final long[] timestamps;
 +        final int[] ttls;
 +        final long now;
 +
 +        private final boolean isJson;
 +
 +        private ResultSetBuilder(long now, boolean isJson) throws InvalidRequestException
 +        {
 +            this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), new ArrayList<List<ByteBuffer>>());
 +            this.selectors = newSelectors();
 +            this.timestamps = collectTimestamps ? new long[columns.size()] : null;
 +            this.ttls = collectTTLs ? new int[columns.size()] : null;
 +            this.now = now;
 +            this.isJson = isJson;
 +        }
 +
 +        public void add(ByteBuffer v)
 +        {
 +            current.add(v);
 +        }
 +
 +        public void add(Cell c)
 +        {
 +            current.add(isDead(c) ? null : value(c));
 +            if (timestamps != null)
 +            {
 +                timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE : c.timestamp();
 +            }
 +            if (ttls != null)
 +            {
 +                int ttl = -1;
 +                if (!isDead(c) && c instanceof ExpiringCell)
 +                    ttl = c.getLocalDeletionTime() - (int) (now / 1000);
 +                ttls[current.size() - 1] = ttl;
 +            }
 +        }
 +
 +        private boolean isDead(Cell c)
 +        {
 +            return c == null || !c.isLive(now);
 +        }
 +
 +        public void newRow(int protocolVersion) throws InvalidRequestException
 +        {
 +            if (current != null)
 +            {
 +                selectors.addInputRow(protocolVersion, this);
 +                if (!selectors.isAggregate())
 +                {
 +                    resultSet.addRow(getOutputRow(protocolVersion));
 +                    selectors.reset();
 +                }
 +            }
 +            current = new ArrayList<>(columns.size());
 +        }
 +
 +        public ResultSet build(int protocolVersion) throws InvalidRequestException
 +        {
 +            if (current != null)
 +            {
 +                selectors.addInputRow(protocolVersion, this);
 +                resultSet.addRow(getOutputRow(protocolVersion));
 +                selectors.reset();
 +                current = null;
 +            }
 +
 +            if (resultSet.isEmpty() && selectors.isAggregate())
 +                resultSet.addRow(getOutputRow(protocolVersion));
 +            return resultSet;
 +        }
 +
 +        private List<ByteBuffer> getOutputRow(int protocolVersion)
 +        {
 +            List<ByteBuffer> outputRow = selectors.getOutputRow(protocolVersion);
 +            return isJson ? rowToJson(outputRow, protocolVersion)
 +                          : outputRow;
 +        }
 +
 +        private List<ByteBuffer> rowToJson(List<ByteBuffer> row, int protocolVersion)
 +        {
 +            StringBuilder sb = new StringBuilder("{");
 +            for (int i = 0; i < metadata.names.size(); i++)
 +            {
 +                if (i > 0)
 +                    sb.append(", ");
 +
 +                ColumnSpecification spec = metadata.names.get(i);
 +                String columnName = spec.name.toString();
 +                if (!columnName.equals(columnName.toLowerCase(Locale.US)))
 +                    columnName = "\"" + columnName + "\"";
 +
 +                ByteBuffer buffer = row.get(i);
 +                sb.append('"');
 +                sb.append(Json.JSON_STRING_ENCODER.quoteAsString(columnName));
 +                sb.append("\": ");
 +                if (buffer == null)
 +                    sb.append("null");
 +                else
 +                    sb.append(spec.type.toJSONString(buffer, protocolVersion));
 +            }
 +            sb.append("}");
 +            return Collections.singletonList(UTF8Type.instance.getSerializer().serialize(sb.toString()));
 +        }
 +
 +        private ByteBuffer value(Cell c)
 +        {
 +            return (c instanceof CounterCell)
 +                ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
 +                : c.value();
 +        }
 +    }
 +
 +    private static interface Selectors
 +    {
 +        public boolean isAggregate();
 +
 +        /**
 +         * Adds the current row of the specified <code>ResultSetBuilder</code>.
 +         *
 +         * @param rs the <code>ResultSetBuilder</code>
 +         * @throws InvalidRequestException
 +         */
 +        public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 +
 +        public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException;
 +
 +        public void reset();
 +    }
 +
 +    // Special cased selection for when no function is used (this save some allocations).
 +    private static class SimpleSelection extends Selection
 +    {
 +        private final boolean isWildcard;
 +
-         public SimpleSelection(CFMetaData cfm, Collection<ColumnDefinition> columns, boolean isWildcard)
++        public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> columns, boolean isWildcard)
 +        {
-             this(cfm, columns, new ArrayList<ColumnSpecification>(columns), isWildcard);
++            this(cfm, columns, SelectionColumnMapping.simpleMapping(columns), isWildcard);
 +        }
 +
 +        public SimpleSelection(CFMetaData cfm,
-                                Collection<ColumnDefinition> columns,
-                                List<ColumnSpecification> metadata,
++                               List<ColumnDefinition> columns,
++                               SelectionColumnMapping metadata,
 +                               boolean isWildcard)
 +        {
 +            /*
 +             * In theory, even a simple selection could have multiple time the same column, so we
 +             * could filter those duplicate out of columns. But since we're very unlikely to
 +             * get much duplicate in practice, it's more efficient not to bother.
 +             */
 +            super(cfm, columns, metadata, false, false);
 +            this.isWildcard = isWildcard;
 +        }
 +
 +        @Override
 +        public boolean isWildcard()
 +        {
 +            return isWildcard;
 +        }
 +
 +        public boolean isAggregate()
 +        {
 +            return false;
 +        }
 +
 +        protected Selectors newSelectors()
 +        {
 +            return new Selectors()
 +            {
 +                private List<ByteBuffer> current;
 +
 +                public void reset()
 +                {
 +                    current = null;
 +                }
 +
 +                public List<ByteBuffer> getOutputRow(int protocolVersion)
 +                {
 +                    return current;
 +                }
 +
 +                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
 +                {
 +                    current = rs.current;
 +                }
 +
 +                public boolean isAggregate()
 +                {
 +                    return false;
 +                }
 +            };
 +        }
 +    }
 +
 +    private static class SelectionWithProcessing extends Selection
 +    {
 +        private final SelectorFactories factories;
 +
 +        public SelectionWithProcessing(CFMetaData cfm,
-                                        Collection<ColumnDefinition> columns,
-                                        List<ColumnSpecification> metadata,
++                                       List<ColumnDefinition> columns,
++                                       SelectionColumnMapping metadata,
 +                                       SelectorFactories factories) throws InvalidRequestException
 +        {
 +            super(cfm,
 +                  columns,
 +                  metadata,
 +                  factories.containsWritetimeSelectorFactory(),
 +                  factories.containsTTLSelectorFactory());
 +
 +            this.factories = factories;
 +
 +            if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions())
 +                throw new InvalidRequestException("the select clause must either contain only aggregates or no aggregate");
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return factories.getFunctions();
 +        }
 +
 +        @Override
 +        public int addColumnForOrdering(ColumnDefinition c)
 +        {
 +            int index = super.addColumnForOrdering(c);
 +            factories.addSelectorForOrdering(c, index);
 +            return index;
 +        }
 +
 +        public boolean isAggregate()
 +        {
 +            return factories.containsOnlyAggregateFunctions();
 +        }
 +
 +        protected Selectors newSelectors() throws InvalidRequestException
 +        {
 +            return new Selectors()
 +            {
 +                private final List<Selector> selectors = factories.newInstances();
 +
 +                public void reset()
 +                {
 +                    for (Selector selector : selectors)
 +                        selector.reset();
 +                }
 +
 +                public boolean isAggregate()
 +                {
 +                    return factories.containsOnlyAggregateFunctions();
 +                }
 +
 +                public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException
 +                {
 +                    List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
 +
 +                    for (Selector selector: selectors)
 +                        outputRow.add(selector.getOutput(protocolVersion));
 +
 +                    return outputRow;
 +                }
 +
 +                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
 +                {
 +                    for (Selector selector : selectors)
 +                        selector.addInput(protocolVersion, rs);
 +                }
 +            };
 +        }
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
index 0000000,0000000..e6c8979
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
@@@ -1,0 -1,0 +1,118 @@@
++package org.apache.cassandra.cql3.selection;
++
++import java.util.LinkedHashSet;
++import java.util.List;
++
++import com.google.common.base.Function;
++import com.google.common.base.Joiner;
++import com.google.common.base.Objects;
++import com.google.common.collect.*;
++
++import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.ColumnSpecification;
++
++/**
++ * Separately maintains the ColumnSpecifications and their mappings to underlying
++ * columns as we may receive null mappings. This occurs where a query result
++ * includes a column specification which does not map to any particular real
++ * column, e.g. COUNT queries or where no-arg functions like now() are used
++ */
++public class SelectionColumnMapping implements SelectionColumns
++{
++    // Uses a LinkedHashSet as both order and uniqueness need to be preserved
++    private final LinkedHashSet<ColumnSpecification> columnSpecifications;
++    private final HashMultimap<ColumnSpecification, ColumnDefinition> columnMappings;
++
++    private SelectionColumnMapping()
++    {
++        this.columnSpecifications = new LinkedHashSet<>();
++        this.columnMappings = HashMultimap.create();
++    }
++
++    protected static SelectionColumnMapping newMapping()
++    {
++        return new SelectionColumnMapping();
++    }
++
++    protected static SelectionColumnMapping simpleMapping(List<ColumnDefinition> columnDefinitions)
++    {
++        SelectionColumnMapping mapping = new SelectionColumnMapping();
++        for (ColumnDefinition def: columnDefinitions)
++            mapping.addMapping(def, def);
++        return mapping;
++    }
++
++    protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, ColumnDefinition column)
++    {
++        columnSpecifications.add(colSpec);
++        // some AbstractFunctionSelector impls do not map directly to an underlying column
++        // so don't record a mapping in that case
++        if (null != column)
++            columnMappings.put(colSpec, column);
++        return this;
++    }
++
++    public List<ColumnSpecification> getColumnSpecifications()
++    {
++        // return a mutable copy as we may add extra columns
++        // for ordering (CASSANDRA-4911 & CASSANDRA-8286)
++        return Lists.newArrayList(columnSpecifications);
++    }
++
++    public Multimap<ColumnSpecification, ColumnDefinition> getMappings()
++    {
++        return Multimaps.unmodifiableMultimap(columnMappings);
++    }
++
++    public boolean equals(Object obj)
++    {
++        if (obj == null)
++            return false;
++
++        if (!(obj instanceof SelectionColumnMapping))
++            return false;
++
++        return Objects.equal(this.columnMappings, ((SelectionColumnMapping)obj).columnMappings);
++    }
++
++    public int hashCode()
++    {
++        return Objects.hashCode(columnMappings);
++    }
++
++    public String toString()
++    {
++        final Function<ColumnDefinition, String> getDefName = new Function<ColumnDefinition, String>()
++        {
++            public String apply(ColumnDefinition def)
++            {
++                return def.name.toString();
++            }
++        };
++        final Function<ColumnSpecification, String> colSpecToMappingString = new Function<ColumnSpecification, String>()
++        {
++            public String apply(ColumnSpecification colSpec)
++            {
++                StringBuilder builder = new StringBuilder();
++                builder.append(colSpec.name.toString());
++                if (columnMappings.containsKey(colSpec))
++                {
++                    builder.append(":[");
++                    builder.append(Joiner.on(',').join(Iterables.transform(columnMappings.get(colSpec), getDefName)));
++                    builder.append("]");
++                }
++                else
++                {
++                    builder.append(":[]");
++                }
++                return builder.toString();
++            }
++        };
++
++        StringBuilder builder = new StringBuilder();
++        builder.append("{ ");
++        builder.append(Joiner.on(", ").join(Iterables.transform(columnSpecifications, colSpecToMappingString)));
++        builder.append(" }");
++        return builder.toString();
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
index 0000000,0000000..af334e6
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
@@@ -1,0 -1,0 +1,18 @@@
++package org.apache.cassandra.cql3.selection;
++
++import java.util.List;
++
++import com.google.common.collect.Multimap;
++
++import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.ColumnSpecification;
++
++/**
++ * Represents a mapping between the actual columns used to satisfy a Selection
++ * and the column definitions included in the resultset metadata for the query.
++ */
++public interface SelectionColumns
++{
++    List<ColumnSpecification> getColumnSpecifications();
++    Multimap<ColumnSpecification, ColumnDefinition> getMappings();
++}