You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/07/16 22:53:21 UTC

cassandra git commit: Allow the selection of columns together with aggregates

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 d97fc9b77 -> 2b99b5d35


Allow the selection of columns together with aggregates

Patch by Benjamin Lerer; reviewed by Robert Stupp for
CASSANDRA-9767


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2b99b5d3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2b99b5d3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2b99b5d3

Branch: refs/heads/cassandra-2.2
Commit: 2b99b5d3542f8695e419e851c2f07758ef46373d
Parents: d97fc9b
Author: blerer <be...@datastax.com>
Authored: Thu Jul 16 22:14:45 2015 +0200
Committer: blerer <be...@datastax.com>
Committed: Thu Jul 16 22:51:40 2015 +0200

----------------------------------------------------------------------
 NEWS.txt                                        |   4 +
 doc/cql3/CQL.textile                            |   5 +-
 .../selection/AbstractFunctionSelector.java     |   9 +-
 .../cassandra/cql3/selection/Selection.java     |   7 +-
 .../cql3/selection/SelectorFactories.java       |  12 --
 .../cql3/selection/SimpleSelector.java          |   8 +-
 .../cql3/selection/WritetimeOrTTLSelector.java  |   8 +-
 .../validation/operations/AggregationTest.java  | 135 +++++++++++++++++--
 8 files changed, 150 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index e114944..e9c6ef8 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Selecting columns,scalar functions, UDT fields, writetime or ttl together
+     with aggregated is now possible. The value returned for the columns,
+     scalar functions, UDT fields, writetime and ttl will be the ones for
+     the first row matching the query.
    - Windows is now a supported platform. Powershell execution for startup scripts
      is highly recommended and can be enabled via an administrator command-prompt
      with: 'powershell set-executionpolicy unrestricted'

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index b49d37b..2749d35 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -1843,7 +1843,10 @@ A number of functions are provided to "convert" the native types into binary dat
 
 h2(#aggregates). Aggregates
 
-CQL3 distinguishes between built-in aggregates (so called 'native aggregates') and "user-defined aggregates":#udas.  CQL3 includes several native aggregates, described below:
+Aggregate functions work on a set of rows. They receive values for each row and returns one value for the whole set.
+If @normal@ columns, @scalar functions@, @UDT@ fields, @writetime@ or @ttl@ are selected together with aggregate functions, the values returned for them will be the ones of the first row matching the query.
+
+CQL3 distinguishes between built-in aggregates (so called 'native aggregates') and "user-defined aggregates":#udas. CQL3 includes several native aggregates, described below:
 
 h3(#countFct). Count
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index 956efca..abf52e1 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.selection;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import com.google.common.collect.Iterables;
@@ -49,12 +48,6 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector
             if (factories.doesAggregation())
                 throw new InvalidRequestException("aggregate functions cannot be used as arguments of aggregate functions");
         }
-        else
-        {
-            if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions())
-                throw new InvalidRequestException(String.format("arguments of function %s must be either all aggregates or no aggregates",
-                                                                fun.name()));
-        }
 
         return new Factory()
         {
@@ -109,7 +102,7 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector
 
             public boolean isAggregateSelectorFactory()
             {
-                return fun.isAggregate() || factories.containsOnlyAggregateFunctions();
+                return fun.isAggregate() || factories.doesAggregation();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index ea66fcc..13e030f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -485,9 +485,6 @@ public abstract class Selection
                   factories.containsTTLSelectorFactory());
 
             this.factories = factories;
-
-            if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions())
-                throw new InvalidRequestException("the select clause must either contain only aggregates or no aggregate");
         }
 
         @Override
@@ -506,7 +503,7 @@ public abstract class Selection
 
         public boolean isAggregate()
         {
-            return factories.containsOnlyAggregateFunctions();
+            return factories.doesAggregation();
         }
 
         protected Selectors newSelectors() throws InvalidRequestException
@@ -523,7 +520,7 @@ public abstract class Selection
 
                 public boolean isAggregate()
                 {
-                    return factories.containsOnlyAggregateFunctions();
+                    return factories.doesAggregation();
                 }
 
                 public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index 81905e6..5ea2957 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@ -109,18 +109,6 @@ final class SelectorFactories implements Iterable<Selector.Factory>
     }
 
     /**
-     * Checks if this <code>SelectorFactories</code> contains only factories for aggregates.
-     *
-     * @return <code>true</code> if this <code>SelectorFactories</code> contains only factories for aggregates,
-     * <code>false</code> otherwise.
-     */
-    public boolean containsOnlyAggregateFunctions()
-    {
-        int size = factories.size();
-        return  size != 0 && numberOfAggregateFactories == size;
-    }
-
-    /**
      * Whether the selector built by this factory does aggregation or not (either directly or in a sub-selector).
      *
      * @return <code>true</code> if the selector built by this factor does aggregation, <code>false</code> otherwise.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index 6c4dc04..2e0514a 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -31,6 +31,7 @@ public final class SimpleSelector extends Selector
     private final int idx;
     private final AbstractType<?> type;
     private ByteBuffer current;
+    private boolean isSet;
 
     public static Factory newFactory(final ColumnDefinition def, final int idx)
     {
@@ -64,7 +65,11 @@ public final class SimpleSelector extends Selector
     @Override
     public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
-        current = rs.current.get(idx);
+        if (!isSet)
+        {
+            isSet = true;
+            current = rs.current.get(idx);
+        }
     }
 
     @Override
@@ -76,6 +81,7 @@ public final class SimpleSelector extends Selector
     @Override
     public void reset()
     {
+        isSet = false;
         current = null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index b3607f3..131827f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.cql3.selection;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -34,6 +33,7 @@ final class WritetimeOrTTLSelector extends Selector
     private final int idx;
     private final boolean isWritetime;
     private ByteBuffer current;
+    private boolean isSet;
 
     public static Factory newFactory(final ColumnDefinition def, final int idx, final boolean isWritetime)
     {
@@ -73,6 +73,11 @@ final class WritetimeOrTTLSelector extends Selector
 
     public void addInput(int protocolVersion, ResultSetBuilder rs)
     {
+        if (isSet)
+            return;
+
+        isSet = true;
+
         if (isWritetime)
         {
             long ts = rs.timestamps[idx];
@@ -92,6 +97,7 @@ final class WritetimeOrTTLSelector extends Selector
 
     public void reset()
     {
+        isSet = false;
         current = null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b99b5d3/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index 62461b8..51e4a28 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -24,19 +24,26 @@ import java.util.Date;
 import java.util.TimeZone;
 
 import org.apache.commons.lang3.time.DateUtils;
-import org.junit.Assert;
+
 import org.junit.Test;
 
+import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.UntypedResultSet.Row;
 import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class AggregationTest extends CQLTester
 {
     @Test
@@ -85,6 +92,116 @@ public class AggregationTest extends CQLTester
     }
 
     @Test
+    public void testAggregateWithColumns() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, primary key (a, b))");
+
+        // Test with empty table
+        assertColumnNames(execute("SELECT count(b), max(b) as max, b, c as first FROM %s"),
+                          "system.count(b)", "max", "b", "first");
+        assertRows(execute("SELECT count(b), max(b) as max, b, c as first FROM %s"),
+                           row(0L, null, null, null));
+
+        execute("INSERT INTO %s (a, b, c) VALUES (1, 2, null)");
+        execute("INSERT INTO %s (a, b, c) VALUES (2, 4, 6)");
+        execute("INSERT INTO %s (a, b, c) VALUES (4, 8, 12)");
+
+        assertRows(execute("SELECT count(b), max(b) as max, b, c as first FROM %s"),
+                   row(3L, 8, 2, null));
+    }
+
+    @Test
+    public void testAggregateWithUdtFields() throws Throwable
+    {
+        String myType = createType("CREATE TYPE %s (x int)");
+        createTable("CREATE TABLE %s (a int primary key, b frozen<" + myType + ">, c frozen<" + myType + ">)");
+
+        // Test with empty table
+        assertColumnNames(execute("SELECT count(b.x), max(b.x) as max, b.x, c.x as first FROM %s"),
+                          "system.count(b.x)", "max", "b.x", "first");
+        assertRows(execute("SELECT count(b.x), max(b.x) as max, b.x, c.x as first FROM %s"),
+                           row(0L, null, null, null));
+
+        execute("INSERT INTO %s (a, b, c) VALUES (1, {x:2}, null)");
+        execute("INSERT INTO %s (a, b, c) VALUES (2, {x:4}, {x:6})");
+        execute("INSERT INTO %s (a, b, c) VALUES (4, {x:8}, {x:12})");
+
+        assertRows(execute("SELECT count(b.x), max(b.x) as max, b.x, c.x as first FROM %s"),
+                   row(3L, 8, 2, null));
+
+        assertInvalidMessage("Invalid field selection: max(b) of type blob is not a user type",
+                             "SELECT max(b).x as max FROM %s");
+    }
+
+    @Test
+    public void testAggregateWithFunctions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b double, c double, primary key(a, b))");
+
+        String copySign = createFunction(KEYSPACE,
+                                         "double, double",
+                                         "CREATE OR REPLACE FUNCTION %s(magnitude double, sign double) " +
+                                         "RETURNS NULL ON NULL INPUT " +
+                                         "RETURNS double " +
+                                         "LANGUAGE JAVA " +
+                                         "AS 'return Double.valueOf(Math.copySign(magnitude, sign));';");
+
+        // Test with empty table
+        assertColumnNames(execute("SELECT count(b), max(b) as max, " + copySign + "(b, c), " + copySign + "(c, b) as first FROM %s"),
+                          "system.count(b)", "max", copySign + "(b, c)", "first");
+        assertRows(execute("SELECT count(b), max(b) as max, " + copySign + "(b, c), " + copySign + "(c, b) as first FROM %s"),
+                           row(0L, null, null, null));
+
+        execute("INSERT INTO %s (a, b, c) VALUES (0, -1.2, 2.1)");
+        execute("INSERT INTO %s (a, b, c) VALUES (0, 1.3, -3.4)");
+        execute("INSERT INTO %s (a, b, c) VALUES (0, 1.4, 1.2)");
+
+        assertRows(execute("SELECT count(b), max(b) as max, " + copySign + "(b, c), " + copySign + "(c, b) as first FROM %s"),
+                   row(3L, 1.4, 1.2, -2.1));
+
+        execute("INSERT INTO %s (a, b, c) VALUES (1, -1.2, null)");
+        execute("INSERT INTO %s (a, b, c) VALUES (1, 1.3, -3.4)");
+        execute("INSERT INTO %s (a, b, c) VALUES (1, 1.4, 1.2)");
+        assertRows(execute("SELECT count(b), max(b) as max, " + copySign + "(b, c), " + copySign + "(c, b) as first FROM %s WHERE a = 1"),
+                   row(3L, 1.4, null, null));
+    }
+
+    @Test
+    public void testAggregateWithWithWriteTimeOrTTL() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int, c int)");
+
+        // Test with empty table
+        assertColumnNames(execute("SELECT count(writetime(b)), min(ttl(b)) as min, writetime(b), ttl(c) as first FROM %s"),
+                          "system.count(writetime(b))", "min", "writetime(b)", "first");
+        assertRows(execute("SELECT count(writetime(b)), min(ttl(b)) as min, writetime(b), ttl(c) as first FROM %s"),
+                           row(0L, null, null, null));
+
+        long today = System.currentTimeMillis() * 1000;
+        long yesterday = today - (DateUtils.MILLIS_PER_DAY * 1000);
+
+        execute("INSERT INTO %s (a, b, c) VALUES (1, 2, null) USING TTL 5;");
+        execute("INSERT INTO %s (a, b, c) VALUES (2, 4, 6) USING TTL 2;");
+        execute("INSERT INTO %s (a, b, c) VALUES (4, 8, 12) USING TIMESTAMP " + yesterday );
+
+        assertRows(execute("SELECT count(writetime(b)), count(ttl(b)) FROM %s"),
+                   row(3L, 2L));
+
+        UntypedResultSet resultSet = execute("SELECT min(ttl(b)), ttl(b) FROM %s");
+        assertEquals(1, resultSet.size());
+        Row row = resultSet.one();
+        assertTrue(row.getInt("ttl(b)") > 4);
+        assertTrue(row.getInt("system.min(ttl(b))") <= 2);
+
+        resultSet = execute("SELECT min(writetime(b)), writetime(b) FROM %s");
+        assertEquals(1, resultSet.size());
+        row = resultSet.one();
+
+        assertTrue(row.getLong("writetime(b)") >= today);
+        assertTrue(row.getLong("system.min(writetime(b))") == yesterday);
+    }
+
+    @Test
     public void testFunctionsWithCompactStorage() throws Throwable
     {
         createTable("CREATE TABLE %s (a int , b int, c double, primary key(a, b) ) WITH COMPACT STORAGE");
@@ -113,8 +230,6 @@ public class AggregationTest extends CQLTester
         execute("INSERT INTO %s (a, b, c) VALUES (1, 3, 8)");
 
         assertInvalidSyntax("SELECT max(b), max(c) FROM %s WHERE max(a) = 1");
-        assertInvalidMessage("only aggregates or no aggregate", "SELECT max(b), c FROM %s");
-        assertInvalidMessage("only aggregates or no aggregate", "SELECT b, max(c) FROM %s");
         assertInvalidMessage("aggregate functions cannot be used as arguments of aggregate functions", "SELECT max(sum(c)) FROM %s");
         assertInvalidSyntax("SELECT COUNT(2) FROM %s");
     }
@@ -155,8 +270,8 @@ public class AggregationTest extends CQLTester
         assertRows(execute("SELECT " + copySign + "(max(c), min(c)) FROM %s"), row(-1.4));
         assertRows(execute("SELECT " + copySign + "(c, d) FROM %s"), row(1.2), row(-1.3), row(1.4));
         assertRows(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), row(1.4));
-        assertInvalidMessage("must be either all aggregates or no aggregates", "SELECT " + copySign + "(c, max(c)) FROM %s");
-        assertInvalidMessage("must be either all aggregates or no aggregates", "SELECT " + copySign + "(max(c), c) FROM %s");
+        assertRows(execute("SELECT " + copySign + "(c, max(c)) FROM %s"), row(1.2));
+        assertRows(execute("SELECT " + copySign + "(max(c), c) FROM %s"), row(-1.4));;
     }
 
     @Test
@@ -824,10 +939,10 @@ public class AggregationTest extends CQLTester
                                        "STYPE int");
 
             ResultMessage.Prepared prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
-            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+            assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
             execute("DROP AGGREGATE " + a + "(int)");
-            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+            assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
             //
 
@@ -836,11 +951,11 @@ public class AggregationTest extends CQLTester
                     "STYPE int");
 
             prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
-            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+            assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
             execute("DROP KEYSPACE " + otherKS + ";");
 
-            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+            assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
         }
         finally
         {