You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/12/24 14:06:50 UTC
[1/3] cassandra git commit: Support counter-columns for native
aggregates (sum, avg, max, min)
Repository: cassandra
Updated Branches:
refs/heads/trunk c9ef25fd8 -> 53e370f62
Support counter-columns for native aggregates (sum,avg,max,min)
patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-9977
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8287ebcb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8287ebcb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8287ebcb
Branch: refs/heads/trunk
Commit: 8287ebcb6ad46529ca90600dc0c2f98ecab89cf0
Parents: ee36f14
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Dec 24 14:05:16 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Dec 24 14:05:16 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/cql3/functions/AggregateFcts.java | 141 +++++++++++--------
.../cassandra/cql3/functions/Functions.java | 2 +
.../cql3/validation/entities/UFTest.java | 26 ++++
.../validation/operations/AggregationTest.java | 42 ++++++
5 files changed, 156 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb0b151..c0fd4f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * Support counter-columns for native aggregates (sum,avg,max,min) (CASSANDRA-9977)
* (cqlsh) show correct column names for empty result sets (CASSANDRA-9813)
* Add new types to Stress (CASSANDRA-9556)
* Add property to allow listening on broadcast interface (CASSANDRA-9748)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
index 41e43c0..77be525 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.db.marshal.DecimalType;
import org.apache.cassandra.db.marshal.DoubleType;
import org.apache.cassandra.db.marshal.FloatType;
@@ -480,31 +481,7 @@ public abstract class AggregateFcts
{
public Aggregate newAggregate()
{
- return new Aggregate()
- {
- private long sum;
-
- public void reset()
- {
- sum = 0;
- }
-
- public ByteBuffer compute(int protocolVersion)
- {
- return ((LongType) returnType()).decompose(sum);
- }
-
- public void addInput(int protocolVersion, List<ByteBuffer> values)
- {
- ByteBuffer value = values.get(0);
-
- if (value == null)
- return;
-
- Number number = ((Number) argTypes().get(0).compose(value));
- sum += number.longValue();
- }
- };
+ return new LongSumAggregate();
}
};
@@ -516,37 +493,7 @@ public abstract class AggregateFcts
{
public Aggregate newAggregate()
{
- return new Aggregate()
- {
- private long sum;
-
- private int count;
-
- public void reset()
- {
- count = 0;
- sum = 0;
- }
-
- public ByteBuffer compute(int protocolVersion)
- {
- long avg = count == 0 ? 0 : sum / count;
-
- return ((LongType) returnType()).decompose(avg);
- }
-
- public void addInput(int protocolVersion, List<ByteBuffer> values)
- {
- ByteBuffer value = values.get(0);
-
- if (value == null)
- return;
-
- count++;
- Number number = ((Number) argTypes().get(0).compose(value));
- sum += number.longValue();
- }
- };
+ return new LongAvgAggregate();
}
};
@@ -707,6 +654,30 @@ public abstract class AggregateFcts
};
/**
+ * The SUM function for counter column values.
+ */
+ public static final AggregateFunction sumFunctionForCounter =
+ new NativeAggregateFunction("sum", CounterColumnType.instance, CounterColumnType.instance)
+ {
+ public Aggregate newAggregate()
+ {
+ return new LongSumAggregate();
+ }
+ };
+
+ /**
+ * AVG function for counter column values.
+ */
+ public static final AggregateFunction avgFunctionForCounter =
+ new NativeAggregateFunction("avg", CounterColumnType.instance, CounterColumnType.instance)
+ {
+ public Aggregate newAggregate()
+ {
+ return new LongAvgAggregate();
+ }
+ };
+
+ /**
* Creates a MAX function for the specified type.
*
* @param inputType the function input and output type
@@ -827,4 +798,62 @@ public abstract class AggregateFcts
}
};
}
+
+ private static class LongSumAggregate implements AggregateFunction.Aggregate
+ {
+ private long sum;
+
+ public void reset()
+ {
+ sum = 0;
+ }
+
+ public ByteBuffer compute(int protocolVersion)
+ {
+ return LongType.instance.decompose(sum);
+ }
+
+ public void addInput(int protocolVersion, List<ByteBuffer> values)
+ {
+ ByteBuffer value = values.get(0);
+
+ if (value == null)
+ return;
+
+ Number number = LongType.instance.compose(value);
+ sum += number.longValue();
+ }
+ }
+
+ private static class LongAvgAggregate implements AggregateFunction.Aggregate
+ {
+ private long sum;
+
+ private int count;
+
+ public void reset()
+ {
+ count = 0;
+ sum = 0;
+ }
+
+ public ByteBuffer compute(int protocolVersion)
+ {
+ long avg = count == 0 ? 0 : sum / count;
+
+ return LongType.instance.decompose(avg);
+ }
+
+ public void addInput(int protocolVersion, List<ByteBuffer> values)
+ {
+ ByteBuffer value = values.get(0);
+
+ if (value == null)
+ return;
+
+ count++;
+ Number number = LongType.instance.compose(value);
+ sum += number.longValue();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index e31fc9f..0f1af19 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -91,6 +91,7 @@ public abstract class Functions
declare(AggregateFcts.sumFunctionForDouble);
declare(AggregateFcts.sumFunctionForDecimal);
declare(AggregateFcts.sumFunctionForVarint);
+ declare(AggregateFcts.sumFunctionForCounter);
declare(AggregateFcts.avgFunctionForByte);
declare(AggregateFcts.avgFunctionForShort);
declare(AggregateFcts.avgFunctionForInt32);
@@ -99,6 +100,7 @@ public abstract class Functions
declare(AggregateFcts.avgFunctionForDouble);
declare(AggregateFcts.avgFunctionForVarint);
declare(AggregateFcts.avgFunctionForDecimal);
+ declare(AggregateFcts.avgFunctionForCounter);
MigrationManager.instance.register(new FunctionsMigrationListener());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 0d11a82..bcfe871 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -716,6 +716,32 @@ public class UFTest extends CQLTester
}
@Test
+ public void testJavaFunctionCounter() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key int primary key, val counter)");
+
+ String fName = createFunction(KEYSPACE, "counter",
+ "CREATE OR REPLACE FUNCTION %s(val counter) " +
+ "CALLED ON NULL INPUT " +
+ "RETURNS bigint " +
+ "LANGUAGE JAVA " +
+ "AS 'return val + 1;';");
+
+ execute("UPDATE %s SET val = val + 1 WHERE key = 1");
+ assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+ row(1, 1L, 2L));
+ execute("UPDATE %s SET val = val + 1 WHERE key = 1");
+ assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+ row(1, 2L, 3L));
+ execute("UPDATE %s SET val = val + 2 WHERE key = 1");
+ assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+ row(1, 4L, 5L));
+ execute("UPDATE %s SET val = val - 2 WHERE key = 1");
+ assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+ row(1, 2L, 3L));
+ }
+
+ @Test
public void testFunctionInTargetKeyspace() throws Throwable
{
createTable("CREATE TABLE %s (key int primary key, val double)");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index e661b4f..0e0313c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.FunctionExecutionException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -172,6 +173,47 @@ public class AggregationTest extends CQLTester
}
@Test
+ public void testAggregateOnCounters() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b counter, primary key (a))");
+
+ // Test with empty table
+ assertColumnNames(execute("SELECT count(b), max(b) as max, b FROM %s"),
+ "system.count(b)", "max", "b");
+ assertRows(execute("SELECT count(b), max(b) as max, b FROM %s"),
+ row(0L, null, null));
+
+ execute("UPDATE %s SET b = b + 1 WHERE a = 1");
+ execute("UPDATE %s SET b = b + 1 WHERE a = 1");
+
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(1L, 2L, 2L, 2L, 2L));
+ flush();
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(1L, 2L, 2L, 2L, 2L));
+
+ execute("UPDATE %s SET b = b + 2 WHERE a = 1");
+
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(1L, 4L, 4L, 4L, 4L));
+
+ execute("UPDATE %s SET b = b - 2 WHERE a = 1");
+
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(1L, 2L, 2L, 2L, 2L));
+ flush();
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(1L, 2L, 2L, 2L, 2L));
+
+ execute("UPDATE %s SET b = b + 1 WHERE a = 2");
+ execute("UPDATE %s SET b = b + 1 WHERE a = 2");
+ execute("UPDATE %s SET b = b + 2 WHERE a = 2");
+
+ assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg, sum(b) as sum FROM %s"),
+ row(2L, 4L, 2L, 3L, 6L));
+ }
+
+ @Test
public void testAggregateWithUdtFields() throws Throwable
{
String myType = createType("CREATE TYPE %s (x int)");
[2/3] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by sn...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0858bfbd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0858bfbd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0858bfbd
Branch: refs/heads/trunk
Commit: 0858bfbd7b1e2c1ff43baf1ec96a5fadf115e88d
Parents: c20566f 8287ebc
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Dec 24 14:05:40 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Dec 24 14:05:40 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by sn...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53e370f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53e370f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53e370f6
Branch: refs/heads/trunk
Commit: 53e370f62b53daeb74f3778cd80b63170d23b8c4
Parents: c9ef25f 0858bfb
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Dec 24 14:06:40 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Dec 24 14:06:40 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------