You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/06/18 18:51:24 UTC

[08/12] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selector.java
index 747dc60,0000000..9b7f0ba
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@@ -1,179 -1,0 +1,192 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +
 +import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.AssignmentTestable;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A <code>Selector</code> is used to convert the data returned by the storage engine into the data requested by the 
 + * user. They correspond to the &lt;selector&gt; elements from the select clause.
 + * <p>Since the introduction of aggregation, <code>Selector</code>s cannot be called anymore by multiple threads 
 + * as they have an internal state.</p>
 + */
 +public abstract class Selector implements AssignmentTestable
 +{
 +    /**
 +     * A factory for <code>Selector</code> instances.
 +     */
 +    public static abstract class Factory
 +    {
 +        public Iterable<Function> getFunctions()
 +        {
 +            return Collections.emptySet();
 +        }
 +
 +        /**
 +         * Returns the column specification corresponding to the output value of the selector instances created by
 +         * this factory.
 +         *
 +         * @param cfm the column family meta data
 +         * @return a column specification
 +         */
 +        public final ColumnSpecification getColumnSpecification(CFMetaData cfm)
 +        {
 +            return new ColumnSpecification(cfm.ksName,
 +                                           cfm.cfName,
 +                                           new ColumnIdentifier(getColumnName(), true),
 +                                           getReturnType());
 +        }
 +
 +        /**
 +         * Creates a new <code>Selector</code> instance.
 +         *
 +         * @return a new <code>Selector</code> instance
 +         */
 +        public abstract Selector newInstance() throws InvalidRequestException;
 +
 +        /**
 +         * Checks if this factory creates selectors instances that creates aggregates.
 +         *
 +         * @return <code>true</code> if this factory creates selectors instances that creates aggregates,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isAggregateSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Checks if this factory creates <code>writetime</code> selectors instances.
 +         *
 +         * @return <code>true</code> if this factory creates <code>writetime</code> selectors instances,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isWritetimeSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Checks if this factory creates <code>TTL</code> selectors instances.
 +         *
 +         * @return <code>true</code> if this factory creates <code>TTL</code> selectors instances,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isTTLSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Returns the name of the column corresponding to the output value of the selector instances created by
 +         * this factory.
 +         *
 +         * @return a column name
 +         */
 +        protected abstract String getColumnName();
 +
 +        /**
 +         * Returns the type of the values returned by the selector instances created by this factory.
 +         *
 +         * @return the selector output type
 +         */
 +        protected abstract AbstractType<?> getReturnType();
 +
++        /**
++         * Record a mapping between the ColumnDefinitions that are used by the selector
++         * instances created by this factory and a column in the ResultSet.Metadata
++         * returned with a query. In most cases, this is likely to be a 1:1 mapping,
++         * but some selector instances may utilise multiple columns (or none at all)
++         * to produce a value (i.e. functions).
++         *
++         * @param mapping the instance of the column mapping belonging to the current query's Selection
++         * @param resultsColumn the column in the ResultSet.Metadata to which the ColumnDefinitions used
++         *                      by the Selector are to be mapped
++         */
++        protected abstract void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultsColumn);
 +    }
 +
 +    /**
 +     * Add the current value from the specified <code>ResultSetBuilder</code>.
 +     *
 +     * @param protocolVersion protocol version used for serialization
 +     * @param rs the <code>ResultSetBuilder</code>
 +     * @throws InvalidRequestException if a problem occurs while add the input value
 +     */
 +    public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 +
 +    /**
 +     * Returns the selector output.
 +     *
 +     * @param protocolVersion protocol version used for serialization
 +     * @return the selector output
 +     * @throws InvalidRequestException if a problem occurs while computing the output value
 +     */
 +    public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException;
 +
 +    /**
 +     * Returns the <code>Selector</code> output type.
 +     *
 +     * @return the <code>Selector</code> output type.
 +     */
 +    public abstract AbstractType<?> getType();
 +
 +    /**
 +     * Checks if this <code>Selector</code> is creating aggregates.
 +     *
 +     * @return <code>true</code> if this <code>Selector</code> is creating aggregates <code>false</code>
 +     * otherwise.
 +     */
 +    public boolean isAggregate()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * Reset the internal state of this <code>Selector</code>.
 +     */
 +    public abstract void reset();
 +
 +    public final AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver)
 +    {
 +        // We should ignore the fact that the output type is frozen in our comparison as functions do not support
 +        // frozen types for arguments
 +        AbstractType<?> receiverType = receiver.type;
 +        if (getType().isFrozenCollection())
 +            receiverType = receiverType.freeze();
 +
 +        if (receiverType.equals(getType()))
 +            return AssignmentTestable.TestResult.EXACT_MATCH;
 +
 +        if (receiverType.isValueCompatibleWith(getType()))
 +            return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
 +
 +        return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index beb7399,0000000..81905e6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@@ -1,206 -1,0 +1,206 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.util.*;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.selection.Selector.Factory;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A set of <code>Selector</code> factories.
 + */
 +final class SelectorFactories implements Iterable<Selector.Factory>
 +{
 +    /**
 +     * The <code>Selector</code> factories.
 +     */
 +    private final List<Selector.Factory> factories;
 +
 +    /**
 +     * <code>true</code> if one of the factory creates writetime selectors.
 +     */
 +    private boolean containsWritetimeFactory;
 +
 +    /**
 +     * <code>true</code> if one of the factory creates TTL selectors.
 +     */
 +    private boolean containsTTLFactory;
 +
 +    /**
 +     * The number of factories creating aggregates.
 +     */
 +    private int numberOfAggregateFactories;
 +
 +    /**
 +     * Creates a new <code>SelectorFactories</code> instance and collect the column definitions.
 +     *
 +     * @param selectables the <code>Selectable</code>s for which the factories must be created
 +     * @param cfm the Column Family Definition
 +     * @param defs the collector parameter for the column definitions
 +     * @return a new <code>SelectorFactories</code> instance
 +     * @throws InvalidRequestException if a problem occurs while creating the factories
 +     */
 +    public static SelectorFactories createFactoriesAndCollectColumnDefinitions(List<Selectable> selectables,
 +                                                                               CFMetaData cfm,
 +                                                                               List<ColumnDefinition> defs)
 +                                                                               throws InvalidRequestException
 +    {
 +        return new SelectorFactories(selectables, cfm, defs);
 +    }
 +
 +    private SelectorFactories(List<Selectable> selectables,
 +                              CFMetaData cfm,
 +                              List<ColumnDefinition> defs)
 +                              throws InvalidRequestException
 +    {
 +        factories = new ArrayList<>(selectables.size());
 +
 +        for (Selectable selectable : selectables)
 +        {
 +            Factory factory = selectable.newSelectorFactory(cfm, defs);
 +            containsWritetimeFactory |= factory.isWritetimeSelectorFactory();
 +            containsTTLFactory |= factory.isTTLSelectorFactory();
 +            if (factory.isAggregateSelectorFactory())
 +                ++numberOfAggregateFactories;
 +            factories.add(factory);
 +        }
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        Iterable<Function> functions = Collections.emptySet();
 +        for (Factory factory : factories)
 +            if (factory != null)
 +                functions = Iterables.concat(functions, factory.getFunctions());
 +        return functions;
 +    }
 +
 +    /**
 +     * Adds a new <code>Selector.Factory</code> for a column that is needed only for ORDER BY purposes.
 +     * @param def the column that is needed for ordering
 +     * @param index the index of the column definition in the Selection's list of columns
 +     */
 +    public void addSelectorForOrdering(ColumnDefinition def, int index)
 +    {
-         factories.add(SimpleSelector.newFactory(def.name.toString(), index, def.type));
++        factories.add(SimpleSelector.newFactory(def, index));
 +    }
 +
 +    /**
 +     * Checks if this <code>SelectorFactories</code> contains only factories for aggregates.
 +     *
 +     * @return <code>true</code> if this <code>SelectorFactories</code> contains only factories for aggregates,
 +     * <code>false</code> otherwise.
 +     */
 +    public boolean containsOnlyAggregateFunctions()
 +    {
 +        int size = factories.size();
 +        return  size != 0 && numberOfAggregateFactories == size;
 +    }
 +
 +    /**
 +     * Whether the selector built by this factory does aggregation or not (either directly or in a sub-selector).
 +     *
 +     * @return <code>true</code> if the selector built by this factor does aggregation, <code>false</code> otherwise.
 +     */
 +    public boolean doesAggregation()
 +    {
 +        return numberOfAggregateFactories > 0;
 +    }
 +
 +    /**
 +     * Checks if this <code>SelectorFactories</code> contains at least one factory for writetime selectors.
 +     *
 +     * @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for writetime
 +     * selectors, <code>false</code> otherwise.
 +     */
 +    public boolean containsWritetimeSelectorFactory()
 +    {
 +        return containsWritetimeFactory;
 +    }
 +
 +    /**
 +     * Checks if this <code>SelectorFactories</code> contains at least one factory for TTL selectors.
 +     *
 +     * @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for TTL
 +     * selectors, <code>false</code> otherwise.
 +     */
 +    public boolean containsTTLSelectorFactory()
 +    {
 +        return containsTTLFactory;
 +    }
 +
 +    /**
 +     * Creates a list of new <code>Selector</code> instances.
 +     * @return a list of new <code>Selector</code> instances.
 +     */
 +    public List<Selector> newInstances() throws InvalidRequestException
 +    {
 +        List<Selector> selectors = new ArrayList<>(factories.size());
 +        for (Selector.Factory factory : factories)
 +        {
 +            selectors.add(factory.newInstance());
 +        }
 +        return selectors;
 +    }
 +
 +    public Iterator<Factory> iterator()
 +    {
 +        return factories.iterator();
 +    }
 +
 +    /**
 +     * Returns the names of the columns corresponding to the output values of the selector instances created by
 +     * these factories.
 +     *
 +     * @return a list of column names
 +     */
 +    public List<String> getColumnNames()
 +    {
 +        return Lists.transform(factories, new com.google.common.base.Function<Selector.Factory, String>()
 +        {
 +            public String apply(Selector.Factory factory)
 +            {
 +                return factory.getColumnName();
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Returns a list of the return types of the selector instances created by these factories.
 +     *
 +     * @return a list of types
 +     */
 +    public List<AbstractType<?>> getReturnTypes()
 +    {
 +        return Lists.transform(factories, new com.google.common.base.Function<Selector.Factory, AbstractType<?>>()
 +        {
 +            public AbstractType<?> apply(Selector.Factory factory)
 +            {
 +                return factory.getReturnType();
 +            }
 +        });
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index c2edaed,0000000..6c4dc04
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@@ -1,93 -1,0 +1,100 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +
++import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +public final class SimpleSelector extends Selector
 +{
 +    private final String columnName;
 +    private final int idx;
 +    private final AbstractType<?> type;
 +    private ByteBuffer current;
 +
-     public static Factory newFactory(final String columnName, final int idx, final AbstractType<?> type)
++    public static Factory newFactory(final ColumnDefinition def, final int idx)
 +    {
 +        return new Factory()
 +        {
 +            @Override
 +            protected String getColumnName()
 +            {
-                 return columnName;
++                return def.name.toString();
 +            }
 +
 +            @Override
 +            protected AbstractType<?> getReturnType()
 +            {
-                 return type;
++                return def.type;
++            }
++
++            protected void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultColumn)
++            {
++               mapping.addMapping(resultColumn, def);
 +            }
 +
 +            @Override
 +            public Selector newInstance()
 +            {
-                 return new SimpleSelector(columnName, idx, type);
++                return new SimpleSelector(def.name.toString(), idx, def.type);
 +            }
 +        };
 +    }
 +
 +    @Override
 +    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
 +    {
 +        current = rs.current.get(idx);
 +    }
 +
 +    @Override
 +    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
 +    {
 +        return current;
 +    }
 +
 +    @Override
 +    public void reset()
 +    {
 +        current = null;
 +    }
 +
 +    @Override
 +    public AbstractType<?> getType()
 +    {
 +        return type;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return columnName;
 +    }
 +
 +    private SimpleSelector(String columnName, int idx, AbstractType<?> type)
 +    {
 +        this.columnName = columnName;
 +        this.idx = idx;
 +        this.type = type;
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index a1ecd3d,0000000..b3607f3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@@ -1,108 -1,0 +1,116 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
++import java.util.Collections;
 +
++import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.db.marshal.LongType;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +final class WritetimeOrTTLSelector extends Selector
 +{
 +    private final String columnName;
 +    private final int idx;
 +    private final boolean isWritetime;
 +    private ByteBuffer current;
 +
-     public static Factory newFactory(final String columnName, final int idx, final boolean isWritetime)
++    public static Factory newFactory(final ColumnDefinition def, final int idx, final boolean isWritetime)
 +    {
 +        return new Factory()
 +        {
 +            protected String getColumnName()
 +            {
-                 return String.format("%s(%s)", isWritetime ? "writetime" : "ttl", columnName);
++                return String.format("%s(%s)", isWritetime ? "writetime" : "ttl", def.name.toString());
 +            }
 +
 +            protected AbstractType<?> getReturnType()
 +            {
 +                return isWritetime ? LongType.instance : Int32Type.instance;
 +            }
 +
++            protected void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultsColumn)
++            {
++               mapping.addMapping(resultsColumn, def);
++            }
++
 +            public Selector newInstance()
 +            {
-                 return new WritetimeOrTTLSelector(columnName, idx, isWritetime);
++                return new WritetimeOrTTLSelector(def.name.toString(), idx, isWritetime);
 +            }
 +
 +            public boolean isWritetimeSelectorFactory()
 +            {
 +                return isWritetime;
 +            }
 +
 +            public boolean isTTLSelectorFactory()
 +            {
 +                return !isWritetime;
 +            }
 +        };
 +    }
 +
 +    public void addInput(int protocolVersion, ResultSetBuilder rs)
 +    {
 +        if (isWritetime)
 +        {
 +            long ts = rs.timestamps[idx];
 +            current = ts != Long.MIN_VALUE ? ByteBufferUtil.bytes(ts) : null;
 +        }
 +        else
 +        {
 +            int ttl = rs.ttls[idx];
 +            current = ttl > 0 ? ByteBufferUtil.bytes(ttl) : null;
 +        }
 +    }
 +
 +    public ByteBuffer getOutput(int protocolVersion)
 +    {
 +        return current;
 +    }
 +
 +    public void reset()
 +    {
 +        current = null;
 +    }
 +
 +    public AbstractType<?> getType()
 +    {
 +        return isWritetime ? LongType.instance : Int32Type.instance;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return columnName;
 +    }
 +
 +    private WritetimeOrTTLSelector(String columnName, int idx, boolean isWritetime)
 +    {
 +        this.columnName = columnName;
 +        this.idx = idx;
 +        this.isWritetime = isWritetime;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index dfb0d07,d0566eb..8ce555f
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -288,9 -341,11 +292,25 @@@ public class SelectStatement implement
          return cfm.cfName;
      }
  
++    /**
++     * May be used by custom QueryHandler implementations
++     */
++    public Selection getSelection()
++    {
++        return selection;
++    }
++
++    /**
++     * May be used by custom QueryHandler implementations
++     */
++    public StatementRestrictions getRestrictions()
++    {
++        return restrictions;
++    }
++
      private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
      {
 -        Collection<ByteBuffer> keys = getKeys(options);
 -        if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
 -            return null;
 +        Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
  
          List<ReadCommand> commands = new ArrayList<>(keys.size());
  
@@@ -453,14 -568,18 +473,17 @@@
          return new SliceQueryFilter(slices, isReversed, limit, toGroup);
      }
  
-     private int getLimit(QueryOptions options) throws InvalidRequestException
+     /**
+      * May be used by custom QueryHandler implementations
+      */
+     public int getLimit(QueryOptions options) throws InvalidRequestException
      {
 -        int l = Integer.MAX_VALUE;
          if (limit != null)
          {
 -            ByteBuffer b = limit.bindAndGet(options);
 -            if (b == null)
 -                throw new InvalidRequestException("Invalid null value of limit");
 -
 +            ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
 +            // treat UNSET limit value as 'unlimited'
 +            if (b == UNSET_BYTE_BUFFER)
 +                return Integer.MAX_VALUE;
              try
              {
                  Int32Type.instance.validate(b);
@@@ -530,18 -853,300 +553,21 @@@
          }
      }
  
 -    /** Returns true if a non-frozen collection is selected, false otherwise. */
 -    private boolean selectACollection()
 -    {
 -        if (!cfm.comparator.hasCollections())
 -            return false;
 -
 -        for (ColumnDefinition def : selection.getColumns())
 -        {
 -            if (def.type.isCollection() && def.type.isMultiCell())
 -                return true;
 -        }
 -
 -        return false;
 -    }
 -
 -    @VisibleForTesting
 -    static List<Composite> buildBound(Bound bound,
 -                                      List<ColumnDefinition> defs,
 -                                      Restriction[] restrictions,
 -                                      boolean isReversed,
 -                                      CType type,
 -                                      QueryOptions options) throws InvalidRequestException
 -    {
 -        CBuilder builder = type.builder();
 -
 -        // The end-of-component of composite doesn't depend on whether the
 -        // component type is reversed or not (i.e. the ReversedType is applied
 -        // to the component comparator but not to the end-of-component itself),
 -        // it only depends on whether the slice is reversed
 -        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
 -        for (int i = 0, m = defs.size(); i < m; i++)
 -        {
 -            ColumnDefinition def = defs.get(i);
 -
 -            // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
 -            // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
 -            // But if the actual comparator itself is reversed, we must inversed the bounds too.
 -            Bound b = isReversed == isReversedType(def) ? bound : Bound.reverse(bound);
 -            Restriction r = restrictions[def.position()];
 -            if (isNullRestriction(r, b) || !r.canEvaluateWithSlices())
 -            {
 -                // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
 -                // For composites, if there was preceding component and we're computing the end, we must change the last component
 -                // End-Of-Component, otherwise we would be selecting only one record.
 -                Composite prefix = builder.build();
 -                return Collections.singletonList(eocBound == Bound.END ? prefix.end() : prefix.start());
 -            }
 -            if (r.isSlice())
 -            {
 -                if (r.isMultiColumn())
 -                {
 -                    MultiColumnRestriction.Slice slice = (MultiColumnRestriction.Slice) r;
 -
 -                    if (!slice.hasBound(b))
 -                    {
 -                        Composite prefix = builder.build();
 -                        return Collections.singletonList(builder.remainingCount() > 0 && eocBound == Bound.END
 -                                ? prefix.end()
 -                                : prefix);
 -                    }
 -
 -                    List<ByteBuffer> vals = slice.componentBounds(b, options);
 -
 -                    for (int j = 0, n = vals.size(); j < n; j++)
 -                        addValue(builder, defs.get(i + j), vals.get(j)) ;
 -                }
 -                else
 -                {
 -                    builder.add(getSliceValue(r, b, options));
 -                }
 -                Operator relType = ((Restriction.Slice)r).getRelation(eocBound, b);
 -                return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
 -            }
 -
 -            if (r.isIN())
 -            {
 -                // The IN query might not have listed the values in comparator order, so we need to re-sort
 -                // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
 -                TreeSet<Composite> inValues = new TreeSet<>(isReversed ? type.reverseComparator() : type);
 -
 -                if (r.isMultiColumn())
 -                {
 -                    List<List<ByteBuffer>> splitInValues = ((MultiColumnRestriction.IN) r).splitValues(options);
 -
 -                    for (List<ByteBuffer> components : splitInValues)
 -                    {
 -                        for (int j = 0; j < components.size(); j++)
 -                            if (components.get(j) == null)
 -                                throw new InvalidRequestException("Invalid null value in condition for column " + defs.get(i + j).name);
 -
 -                        Composite prefix = builder.buildWith(components);
 -                        inValues.add(builder.remainingCount() == 0 ? prefix : addEOC(prefix, eocBound));
 -                    }
 -                    return new ArrayList<>(inValues);
 -                }
 -
 -                List<ByteBuffer> values = r.values(options);
 -                if (values.size() != 1)
 -                {
 -                    // IN query, we only support it on the clustering columns
 -                    assert def.position() == defs.size() - 1;
 -                    for (ByteBuffer val : values)
 -                    {
 -                        if (val == null)
 -                            throw new InvalidRequestException(String.format("Invalid null value in condition for column %s",
 -                                                                            def.name));
 -                        Composite prefix = builder.buildWith(val);
 -                        // See below for why this
 -                        inValues.add(builder.remainingCount() == 0 ? prefix : addEOC(prefix, eocBound));
 -                    }
 -                    return new ArrayList<>(inValues);
 -                }
 -            }
 -
 -            List<ByteBuffer> values = r.values(options);
 -
 -            if (r.isMultiColumn())
 -            {
 -                for (int j = 0; j < values.size(); j++)
 -                    addValue(builder, defs.get(i + j), values.get(j));
 -                i += values.size() - 1; // skips the processed columns
 -            }
 -            else
 -            {
 -                addValue(builder, def, values.get(0));
 -            }
 -        }
 -        // Means no relation at all or everything was an equal
 -        // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
 -        // it would be harmless to do it. However, we use this method got the partition key too. And when a query
 -        // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
 -        // case using the eoc would be bad, since for the random partitioner we have no guarantee that
 -        // prefix.end() will sort after prefix (see #5240).
 -        Composite prefix = builder.build();
 -        return Collections.singletonList(builder.remainingCount() == 0 ? prefix : addEOC(prefix, eocBound));
 -    }
 -
 -    /**
 -     * Adds an EOC to the specified Composite.
 -     *
 -     * @param composite the composite
 -     * @param eocBound the EOC bound
 -     * @return a new <code>Composite</code> with the EOC corresponding to the eocBound
 -     */
 -    private static Composite addEOC(Composite composite, Bound eocBound)
 -    {
 -        return eocBound == Bound.END ? composite.end() : composite.start();
 -    }
 -
 -    /**
 -     * Adds the specified value to the specified builder
 -     *
 -     * @param builder the CBuilder to which the value must be added
 -     * @param def the column associated to the value
 -     * @param value the value to add
 -     * @throws InvalidRequestException if the value is null
 -     */
 -    private static void addValue(CBuilder builder, ColumnDefinition def, ByteBuffer value) throws InvalidRequestException
 -    {
 -        if (value == null)
 -            throw new InvalidRequestException(String.format("Invalid null value in condition for column %s", def.name));
 -        builder.add(value);
 -    }
 -
 -    private static Composite.EOC eocForRelation(Operator op)
 -    {
 -        switch (op)
 -        {
 -            case LT:
 -                // < X => using startOf(X) as finish bound
 -                return Composite.EOC.START;
 -            case GT:
 -            case LTE:
 -                // > X => using endOf(X) as start bound
 -                // <= X => using endOf(X) as finish bound
 -                return Composite.EOC.END;
 -            default:
 -                // >= X => using X as start bound (could use START_OF too)
 -                // = X => using X
 -                return Composite.EOC.NONE;
 -        }
 -    }
 -
 -    private static boolean isNullRestriction(Restriction r, Bound b)
 -    {
 -        return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
 -    }
 -
 -    private static ByteBuffer getSliceValue(Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
 -    {
 -        Restriction.Slice slice = (Restriction.Slice)r;
 -        assert slice.hasBound(b);
 -        ByteBuffer val = slice.bound(b, options);
 -        if (val == null)
 -            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", r));
 -        return val;
 -    }
 -
 -    private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
 -    {
 -        assert isColumnRange();
 -        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
 -    }
 -
+     /**
+      * May be used by custom QueryHandler implementations
+      */
      public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException
      {
 -        if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
 +        if (!restrictions.usesSecondaryIndexing())
              return Collections.emptyList();
  
 -        List<IndexExpression> expressions = new ArrayList<IndexExpression>();
 -        for (ColumnDefinition def : restrictedColumns.keySet())
 -        {
 -            Restriction restriction;
 -            switch (def.kind)
 -            {
 -                case PARTITION_KEY:
 -                    restriction = keyRestrictions[def.position()];
 -                    break;
 -                case CLUSTERING_COLUMN:
 -                    restriction = columnRestrictions[def.position()];
 -                    break;
 -                case REGULAR:
 -                case STATIC:
 -                    restriction = metadataRestrictions.get(def.name);
 -                    break;
 -                default:
 -                    // We don't allow restricting a COMPACT_VALUE for now in prepare.
 -                    throw new AssertionError();
 -            }
 -
 -            if (restriction.isSlice())
 -            {
 -                Restriction.Slice slice = (Restriction.Slice)restriction;
 -                for (Bound b : Bound.values())
 -                {
 -                    if (slice.hasBound(b))
 -                    {
 -                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
 -                        Operator op = slice.getIndexOperator(b);
 -                        // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
 -                        // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
 -                        // use the underlying comparator as is.
 -                        if (def.type instanceof ReversedType)
 -                            op = reverse(op);
 -                        expressions.add(new IndexExpression(def.name.bytes, op, value));
 -                    }
 -                }
 -            }
 -            else if (restriction.isContains())
 -            {
 -                SingleColumnRestriction.Contains contains = (SingleColumnRestriction.Contains)restriction;
 -                for (ByteBuffer value : contains.values(options))
 -                {
 -                    validateIndexedValue(def, value);
 -                    expressions.add(new IndexExpression(def.name.bytes, Operator.CONTAINS, value));
 -                }
 -                for (ByteBuffer key : contains.keys(options))
 -                {
 -                    validateIndexedValue(def, key);
 -                    expressions.add(new IndexExpression(def.name.bytes, Operator.CONTAINS_KEY, key));
 -                }
 -            }
 -            else
 -            {
 -                ByteBuffer value;
 -                if (restriction.isMultiColumn())
 -                {
 -                    List<ByteBuffer> values = restriction.values(options);
 -                    value = values.get(def.position());
 -                }
 -                else
 -                {
 -                    List<ByteBuffer> values = restriction.values(options);
 -                    if (values.size() != 1)
 -                        throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
 +        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
  
 -                    value = values.get(0);
 -                }
 +        List<IndexExpression> expressions = restrictions.getIndexExpressions(secondaryIndexManager, options);
  
 -                validateIndexedValue(def, value);
 -                expressions.add(new IndexExpression(def.name.bytes, Operator.EQ, value));
 -            }
 -        }
 +        secondaryIndexManager.validateIndexSearchersForQuery(expressions);
  
 -        if (usesSecondaryIndexing)
 -        {
 -            ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
 -            SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
 -            secondaryIndexManager.validateIndexSearchersForQuery(expressions);
 -        }
 -        
          return expressions;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f8516da/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index 0000000,0000000..5bacf0d
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@@ -1,0 -1,0 +1,353 @@@
++package org.apache.cassandra.cql3.selection;
++
++import java.util.Collections;
++
++import com.google.common.collect.ImmutableList;
++import org.junit.Test;
++
++import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.config.Schema;
++import org.apache.cassandra.cql3.*;
++import org.apache.cassandra.cql3.statements.SelectStatement;
++import org.apache.cassandra.db.marshal.*;
++import org.apache.cassandra.exceptions.RequestValidationException;
++import org.apache.cassandra.service.ClientState;
++import org.apache.cassandra.utils.ByteBufferUtil;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertTrue;
++
++public class SelectionColumnMappingTest extends CQLTester
++{
++    String tableName;
++    String typeName;
++    UserType userType;
++    String functionName;
++
++    @Test
++    public void testSelectionColumnMapping() throws Throwable
++    {
++        // Organised as a single test to avoid the overhead of
++        // table creation for each variant
++
++        typeName = createType("CREATE TYPE %s (f1 int, f2 text)");
++        tableName = createTable("CREATE TABLE %s (" +
++                                    " k int PRIMARY KEY," +
++                                    " v1 int," +
++                                    " v2 ascii," +
++                                    " v3 frozen<" + typeName + ">)");
++        userType = Schema.instance.getKSMetaData(KEYSPACE).userTypes.getType(ByteBufferUtil.bytes(typeName));
++        functionName = createFunction(KEYSPACE, "int, ascii",
++                                      "CREATE FUNCTION %s (i int, a ascii) " +
++                                      "CALLED ON NULL INPUT " +
++                                      "RETURNS int " +
++                                      "LANGUAGE java " +
++                                      "AS 'return Integer.valueOf(i);'");
++        testSimpleTypes();
++        testWildcard();
++        testSimpleTypesWithAliases();
++        testUserTypes();
++        testUserTypesWithAliases();
++        testWritetimeAndTTL();
++        testWritetimeAndTTLWithAliases();
++        testFunction();
++        testNoArgFunction();
++        testUserDefinedFunction();
++        testOverloadedFunction();
++        testFunctionWithAlias();
++        testMultipleAliasesOnSameColumn();
++        testCount();
++        testMixedColumnTypes();
++    }
++
++    @Test
++    public void testMultipleArgumentFunction() throws Throwable
++    {
++        // demonstrate behaviour of token() with composite partition key
++        tableName = createTable("CREATE TABLE %s (a int, b text, PRIMARY KEY ((a, b)))");
++        ColumnSpecification tokenSpec = columnSpecification("system.token(a, b)", BytesType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(tokenSpec, columnDefinition("a"))
++                                                                .addMapping(tokenSpec, columnDefinition("b"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT token(a,b) FROM %s"));
++    }
++
++    private void testSimpleTypes() throws Throwable
++    {
++        // simple column identifiers without aliases are represented in
++        // ResultSet.Metadata by the underlying ColumnDefinition
++        ColumnSpecification kSpec = columnSpecification("k", Int32Type.instance);
++        ColumnSpecification v1Spec = columnSpecification("v1", Int32Type.instance);
++        ColumnSpecification v2Spec = columnSpecification("v2", AsciiType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(kSpec, columnDefinition("k"))
++                                                                .addMapping(v1Spec, columnDefinition("v1"))
++                                                                .addMapping(v2Spec, columnDefinition("v2"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT k, v1, v2 FROM %s"));
++    }
++
++    private void testWildcard() throws Throwable
++    {
++        // Wildcard select represents each column in the table with a ColumnDefinition
++        // in the ResultSet metadata
++        ColumnDefinition kSpec = columnDefinition("k");
++        ColumnDefinition v1Spec = columnDefinition("v1");
++        ColumnDefinition v2Spec = columnDefinition("v2");
++        ColumnDefinition v3Spec = columnDefinition("v3");
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(kSpec, columnDefinition("k"))
++                                                                .addMapping(v1Spec, columnDefinition("v1"))
++                                                                .addMapping(v2Spec, columnDefinition("v2"))
++                                                                .addMapping(v3Spec, columnDefinition("v3"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT * FROM %s"));
++    }
++
++    private void testSimpleTypesWithAliases() throws Throwable
++    {
++        // simple column identifiers with aliases are represented in ResultSet.Metadata
++        // by a ColumnSpecification based on the underlying ColumnDefinition
++        ColumnSpecification kSpec = columnSpecification("k_alias", Int32Type.instance);
++        ColumnSpecification v1Spec = columnSpecification("v1_alias", Int32Type.instance);
++        ColumnSpecification v2Spec = columnSpecification("v2_alias", AsciiType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(kSpec, columnDefinition("k"))
++                                                                .addMapping(v1Spec, columnDefinition("v1"))
++                                                                .addMapping(v2Spec, columnDefinition("v2"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT k AS k_alias, v1 AS v1_alias, v2 AS v2_alias FROM %s"));
++    }
++
++    private void testUserTypes() throws Throwable
++    {
++        // User type fields are represented in ResultSet.Metadata by a
++        // ColumnSpecification denoting the name and type of the particular field
++        ColumnSpecification f1Spec = columnSpecification("v3.f1", Int32Type.instance);
++        ColumnSpecification f2Spec = columnSpecification("v3.f2", UTF8Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(f1Spec, columnDefinition("v3"))
++                                                                .addMapping(f2Spec, columnDefinition("v3"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT v3.f1, v3.f2 FROM %s"));
++    }
++
++    private void testUserTypesWithAliases() throws Throwable
++    {
++        // User type fields with aliases are represented in ResultSet.Metadata
++        // by a ColumnSpecification with the alias name and the type of the actual field
++        ColumnSpecification f1Spec = columnSpecification("f1_alias", Int32Type.instance);
++        ColumnSpecification f2Spec = columnSpecification("f2_alias", UTF8Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(f1Spec, columnDefinition("v3"))
++                                                                .addMapping(f2Spec, columnDefinition("v3"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT v3.f1 AS f1_alias, v3.f2 AS f2_alias FROM %s"));
++    }
++
++    private void testWritetimeAndTTL() throws Throwable
++    {
++        // writetime and ttl are represented in ResultSet.Metadata by a ColumnSpecification
++        // with the function name plus argument and a long or int type respectively
++        ColumnSpecification wtSpec = columnSpecification("writetime(v1)", LongType.instance);
++        ColumnSpecification ttlSpec = columnSpecification("ttl(v2)", Int32Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(wtSpec, columnDefinition("v1"))
++                                                                .addMapping(ttlSpec, columnDefinition("v2"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT writetime(v1), ttl(v2) FROM %s"));
++    }
++
++    private void testWritetimeAndTTLWithAliases() throws Throwable
++    {
++        // writetime and ttl with aliases are represented in ResultSet.Metadata
++        // by a ColumnSpecification with the alias name and the appropriate numeric type
++        ColumnSpecification wtSpec = columnSpecification("wt_alias", LongType.instance);
++        ColumnSpecification ttlSpec = columnSpecification("ttl_alias", Int32Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(wtSpec, columnDefinition("v1"))
++                                                                .addMapping(ttlSpec, columnDefinition("v2"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT writetime(v1) AS wt_alias, ttl(v2) AS ttl_alias FROM %s"));
++    }
++
++    private void testFunction() throws Throwable
++    {
++        // a function such as intasblob(<col>) is represented in ResultSet.Metadata
++        // by a ColumnSpecification with the function name plus args and the type set
++        // to the function's return type
++        ColumnSpecification fnSpec = columnSpecification("system.intasblob(v1)", BytesType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(fnSpec, columnDefinition("v1"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT intasblob(v1) FROM %s"));
++    }
++
++    private void testNoArgFunction() throws Throwable
++    {
++        // a no-arg function such as now() is represented in ResultSet.Metadata
++        // but has no mapping to any underlying column
++        ColumnSpecification fnSpec = columnSpecification("system.now()", TimeUUIDType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping().addMapping(fnSpec, null);
++
++        SelectionColumns actual = extractColumnMappingFromSelect("SELECT now() FROM %s");
++        assertEquals(expected, actual);
++        assertEquals(Collections.singletonList(fnSpec), actual.getColumnSpecifications());
++        assertTrue(actual.getMappings().isEmpty());
++    }
++
++    private void testOverloadedFunction() throws Throwable
++    {
++        String fnName = createFunction(KEYSPACE, "int",
++                                       "CREATE FUNCTION %s (input int) " +
++                                       "RETURNS NULL ON NULL INPUT " +
++                                       "RETURNS text " +
++                                       "LANGUAGE java " +
++                                       "AS 'return \"Hello World\";'");
++        createFunctionOverload(fnName, "text",
++                               "CREATE FUNCTION %s (input text) " +
++                               "RETURNS NULL ON NULL INPUT " +
++                               "RETURNS text " +
++                               "LANGUAGE java " +
++                               "AS 'return \"Hello World\";'");
++
++        createFunctionOverload(fnName, "int, text",
++                               "CREATE FUNCTION %s (input1 int, input2 text) " +
++                               "RETURNS NULL ON NULL INPUT " +
++                               "RETURNS text " +
++                               "LANGUAGE java " +
++                               "AS 'return \"Hello World\";'");
++        ColumnSpecification fnSpec1 = columnSpecification(fnName + "(v1)", UTF8Type.instance);
++        ColumnSpecification fnSpec2 = columnSpecification(fnName + "(v2)", UTF8Type.instance);
++        ColumnSpecification fnSpec3 = columnSpecification(fnName + "(v1, v2)", UTF8Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(fnSpec1, columnDefinition("v1"))
++                                                                .addMapping(fnSpec2, columnDefinition("v2"))
++                                                                .addMapping(fnSpec3, columnDefinition("v1"))
++                                                                .addMapping(fnSpec3, columnDefinition("v2"));
++
++        String select = String.format("SELECT %1$s(v1), %1$s(v2), %1$s(v1, v2) FROM %%s", fnName);
++        SelectionColumns actual = extractColumnMappingFromSelect(select);
++
++        assertEquals(expected, actual);
++        assertEquals(ImmutableList.of(fnSpec1, fnSpec2, fnSpec3), actual.getColumnSpecifications());
++    }
++
++    private void testCount() throws Throwable
++    {
++        // SELECT COUNT does not necessarily include any mappings, but it must always return
++        // a singleton list from getColumnSpecifications() in order for the ResultSet.Metadata
++        // to be constructed correctly:
++        // * COUNT(*) / COUNT(1) do not generate any mappings, as no specific columns are referenced
++        // * COUNT(foo) does generate a mapping from the 'system.count' column spec to foo
++        ColumnSpecification count = columnSpecification("count", LongType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(count, null);
++
++        SelectionColumns actual = extractColumnMappingFromSelect("SELECT COUNT(*) FROM %s");
++        assertEquals(expected, actual);
++        assertEquals(Collections.singletonList(count), actual.getColumnSpecifications());
++        assertTrue(actual.getMappings().isEmpty());
++
++        actual = extractColumnMappingFromSelect("SELECT COUNT(1) FROM %s");
++        assertEquals(expected, actual);
++        assertEquals(Collections.singletonList(count), actual.getColumnSpecifications());
++        assertTrue(actual.getMappings().isEmpty());
++
++        ColumnSpecification countV1 = columnSpecification("system.count(v1)", LongType.instance);
++        expected = SelectionColumnMapping.newMapping()
++                                         .addMapping(countV1, columnDefinition("v1"));
++        actual = extractColumnMappingFromSelect("SELECT COUNT(v1) FROM %s");
++        assertEquals(expected, actual);
++        assertEquals(Collections.singletonList(countV1), actual.getColumnSpecifications());
++        assertFalse(actual.getMappings().isEmpty());
++    }
++
++    private void testUserDefinedFunction() throws Throwable
++    {
++        // UDFs are basically represented in the same way as system functions
++        String functionCall = String.format("%s(v1, v2)", functionName);
++        ColumnSpecification fnSpec = columnSpecification(functionCall, Int32Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(fnSpec, columnDefinition("v1"))
++                                                                .addMapping(fnSpec, columnDefinition("v2"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT " + functionCall + " FROM %s"));
++    }
++
++    private void testFunctionWithAlias() throws Throwable
++    {
++        // a function with an alias is represented in ResultSet.Metadata by a
++        // ColumnSpecification with the alias and the type set to the function's
++        // return type
++        ColumnSpecification fnSpec = columnSpecification("fn_alias", BytesType.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(fnSpec, columnDefinition("v1"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT intasblob(v1) AS fn_alias FROM %s"));
++    }
++
++    private void testMultipleAliasesOnSameColumn() throws Throwable
++    {
++        // Multiple result columns derived from the same underlying column are
++        // represented by ColumnSpecifications
++        ColumnSpecification alias1 = columnSpecification("alias_1", Int32Type.instance);
++        ColumnSpecification alias2 = columnSpecification("alias_2", Int32Type.instance);
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(alias1, columnDefinition("v1"))
++                                                                .addMapping(alias2, columnDefinition("v1"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT v1 AS alias_1, v1 AS alias_2 FROM %s"));
++    }
++
++    private void testMixedColumnTypes() throws Throwable
++    {
++        ColumnSpecification kSpec = columnSpecification("k_alias", Int32Type.instance);
++        ColumnSpecification v1Spec = columnSpecification("writetime(v1)", LongType.instance);
++        ColumnSpecification v2Spec = columnSpecification("ttl_alias", Int32Type.instance);
++        ColumnSpecification f1Spec = columnSpecification("v3.f1", Int32Type.instance);
++        ColumnSpecification f2Spec = columnSpecification("f2_alias", UTF8Type.instance);
++        ColumnSpecification f3Spec = columnSpecification("v3", userType);
++
++        SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
++                                                                .addMapping(kSpec, columnDefinition("k"))
++                                                                .addMapping(v1Spec, columnDefinition("v1"))
++                                                                .addMapping(v2Spec, columnDefinition("v2"))
++                                                                .addMapping(f1Spec, columnDefinition("v3"))
++                                                                .addMapping(f2Spec, columnDefinition("v3"))
++                                                                .addMapping(f3Spec, columnDefinition("v3"));
++
++        assertEquals(expected, extractColumnMappingFromSelect("SELECT k AS k_alias," +
++                                                              "       writetime(v1)," +
++                                                              "       ttl(v2) as ttl_alias," +
++                                                              "       v3.f1," +
++                                                              "       v3.f2 AS f2_alias," +
++                                                              "       v3" +
++                                                              " FROM %s"));
++    }
++
++    private SelectionColumns extractColumnMappingFromSelect(String query) throws RequestValidationException
++    {
++        CQLStatement statement = QueryProcessor.getStatement(String.format(query, KEYSPACE + "." + tableName),
++                                                             ClientState.forInternalCalls()).statement;
++        assertTrue(statement instanceof SelectStatement);
++        return ((SelectStatement)statement).getSelection().getColumnMapping();
++    }
++
++    private ColumnDefinition columnDefinition(String name)
++    {
++        return Schema.instance.getCFMetaData(KEYSPACE, tableName)
++                              .getColumnDefinition(new ColumnIdentifier(name, true));
++
++    }
++
++    private ColumnSpecification columnSpecification(String name, AbstractType<?> type)
++    {
++        return new ColumnSpecification(KEYSPACE,
++                                       tableName,
++                                       new ColumnIdentifier(name, true),
++                                       type);
++    }
++}