You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/25 12:21:34 UTC
[1/4] storm git commit: [STORM-2118] A few fixes for storm-sql
standalone mode
Repository: storm
Updated Branches:
refs/heads/1.x-branch 68bffe37d -> 8462725be
[STORM-2118] A few fixes for storm-sql standalone mode
1. Cast the result, accumulator and value types correctly
2. Support aggregate functions with more than one argument
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/30016384
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/30016384
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/30016384
Branch: refs/heads/1.x-branch
Commit: 3001638431f29cd9243e4b9016692b6cec505bca
Parents: 68bffe3
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Sep 15 15:42:25 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:19:39 2016 +0900
----------------------------------------------------------------------
.../backends/standalone/RelNodeCompiler.java | 39 ++++++++------------
.../test/org/apache/storm/sql/TestStormSql.java | 7 +++-
.../test/org/apache/storm/sql/TestUtils.java | 21 +++++++++++
3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 9df0496..c6adb28 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -320,22 +320,19 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
String resultName = varName + "_result";
+ Class<?> accumulatorType = aggFn.accumulatorType;
+ Class<?> resultType = aggFn.resultType;
List<String> args = new ArrayList<>();
if (!aggFn.isStatic) {
String aggObjName = String.format("%s_obj", varName);
- String aggObjClassName = (aggFn.initMethod.getDeclaringClass().getCanonicalName());
- boolean genericType = aggFn.initMethod.getDeclaringClass().getTypeParameters().length > 0;
- if (genericType) {
- pw.println(" @SuppressWarnings(\"unchecked\")");
- pw.print(String.format(" final %1$s<%3$s> %2$s = (%1$s<%3$s>) accumulators.get(\"%2$s\");", aggObjClassName,
- aggObjName, Primitives.wrap((Class<?>) ty).getCanonicalName()));
- } else {
- pw.print(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, aggObjName));
- }
+ String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+ pw.println(" @SuppressWarnings(\"unchecked\")");
+ pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+ aggObjName));
args.add(aggObjName);
}
- args.add(String.format("(%s)accumulators.get(\"%s\")", ((Class<?>) ty).getCanonicalName(), varName));
- pw.print(String.format(" final %s %s = %s;", ((Class<?>) ty).getCanonicalName(),
+ args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
+ pw.println(String.format(" final %s %s = %s;", resultType.getCanonicalName(),
resultName, ExprCompiler.printMethodCall(aggFn.resultMethod, args)));
return resultName;
@@ -350,8 +347,6 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
if (call.getArgList().size() != 0) {
throw new UnsupportedOperationException("Count with nullable fields");
}
- } else {
- throw new IllegalArgumentException("Aggregate call should have one argument");
}
}
if (aggFunction instanceof SqlUserDefinedAggFunction) {
@@ -378,30 +373,28 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
List<String> args = new ArrayList<>();
+ Class<?> accumulatorType = aggFn.accumulatorType;
if (!aggFn.isStatic) {
String aggObjName = String.format("%s_obj", varName);
String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
pw.println(String.format(" if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
pw.println(String.format(" accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
pw.println(" }");
- boolean genericType = aggFn.initMethod.getDeclaringClass().getTypeParameters().length > 0;
- if (genericType) {
- pw.println(" @SuppressWarnings(\"unchecked\")");
- pw.println(String.format(" final %1$s<%3$s> %2$s = (%1$s<%3$s>) accumulators.get(\"%2$s\");", aggObjClassName,
- aggObjName, Primitives.wrap((Class<?>) ty).getCanonicalName()));
- } else {
- pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, aggObjName));
- }
+ pw.println(" @SuppressWarnings(\"unchecked\")");
+ pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+ aggObjName));
args.add(aggObjName);
}
args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
"accumulators.get(\"" + varName + "\")",
ExprCompiler.printMethodCall(aggFn.initMethod, args),
- Primitives.wrap((Class<?>) ty).getCanonicalName()));
+ accumulatorType.getCanonicalName()));
if (argList.isEmpty()) {
args.add("EMPTY_VALUES");
} else {
- args.add(String.format("(%s) %s", ((Class<?>) ty).getCanonicalName(), "_data.get(" + argList.get(0) + ")"));
+ for (int i = 0; i < aggFn.valueTypes.size(); i++) {
+ args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
+ }
}
pw.print(String.format(" accumulators.put(\"%s\", %s);\n",
varName,
http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 7b8554e..025219b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -292,7 +292,8 @@ public class TestStormSql {
List<String> stmt = new ArrayList<>();
stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
stmt.add("CREATE FUNCTION MYCONCAT AS 'org.apache.storm.sql.TestUtils$MyConcat'");
- stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME) FROM FOO GROUP BY (ID)");
+ stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
+ stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) FROM FOO GROUP BY (ID)");
StormSql sql = StormSql.construct();
List<Values> values = new ArrayList<>();
ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
@@ -300,12 +301,16 @@ public class TestStormSql {
Assert.assertEquals(4, values.size());
Assert.assertEquals(3, values.get(0).get(1));
Assert.assertEquals("xxx", values.get(0).get(2));
+ Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
Assert.assertEquals(12, values.get(1).get(1));
Assert.assertEquals("xxx", values.get(1).get(2));
+ Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
Assert.assertEquals(21, values.get(2).get(1));
Assert.assertEquals("xxx", values.get(2).get(2));
+ Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
Assert.assertEquals(9, values.get(3).get(1));
Assert.assertEquals("x", values.get(3).get(2));
+ Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
}
@Test
http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 5d0384a..9eceaf5 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -36,9 +36,11 @@ import org.apache.storm.trident.tuple.TridentTuple;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
public class TestUtils {
public static class MyPlus {
@@ -60,6 +62,25 @@ public class TestUtils {
}
+ public static class TopN {
+ public static PriorityQueue<Integer> init() {
+ return new PriorityQueue<>();
+ }
+ public static PriorityQueue<Integer> add(PriorityQueue<Integer> accumulator, Integer n, Integer val) {
+ if (accumulator.size() >= n) {
+ accumulator.remove();
+ }
+ accumulator.add(val);
+ return accumulator;
+ }
+ public static List<Integer> result(PriorityQueue<Integer> accumulator) {
+ List<Integer> res = new ArrayList<>(accumulator);
+ Collections.reverse(res);
+ return res;
+ }
+ }
+
+
public static class MockDataSource implements DataSource {
private final ArrayList<Values> RECORDS = new ArrayList<>();
[4/4] storm git commit: add STORM-2118 to CHANGELOG
Posted by ka...@apache.org.
add STORM-2118 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8462725b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8462725b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8462725b
Branch: refs/heads/1.x-branch
Commit: 8462725bec2f9ad16f1c36ec2f006b725c0b9e1a
Parents: 8ada12f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Sep 25 21:20:21 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:20:21 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8462725b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index be29931..f7091c9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -88,6 +88,7 @@
* STORM-1720: Support GEO in storm-redis
## 1.0.3
+ * STORM-2118: A few fixes for storm-sql standalone mode
* STORM-2119: bug in log message printing to stdout
* STORM-2120: Emit to _spoutConfig.outputStreamId
* STORM-2101: fixes npe in compute-executors in nimbus
[3/4] storm git commit: Merge branch 'STORM-2118-1.x' into 1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2118-1.x' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ada12fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ada12fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ada12fd
Branch: refs/heads/1.x-branch
Commit: 8ada12fd3391a5c9b06aa10007e46416123cbdb2
Parents: 68bffe3 324a9dd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Sep 25 21:20:00 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:20:00 2016 +0900
----------------------------------------------------------------------
.../backends/standalone/RelNodeCompiler.java | 39 ++++++++------------
.../test/org/apache/storm/sql/TestStormSql.java | 7 +++-
.../test/org/apache/storm/sql/TestUtils.java | 28 ++++++++++++++
3 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: [STORM-2118] address review comments
Posted by ka...@apache.org.
[STORM-2118] address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/324a9dd3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/324a9dd3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/324a9dd3
Branch: refs/heads/1.x-branch
Commit: 324a9dd36a80eb26a51214bba086369f3debf47d
Parents: 3001638
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Sep 23 11:27:45 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:19:43 2016 +0900
----------------------------------------------------------------------
.../src/test/org/apache/storm/sql/TestUtils.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/324a9dd3/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 9eceaf5..58c3f26 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -67,10 +67,17 @@ public class TestUtils {
return new PriorityQueue<>();
}
public static PriorityQueue<Integer> add(PriorityQueue<Integer> accumulator, Integer n, Integer val) {
+ if (n <= 0) {
+ return accumulator;
+ }
if (accumulator.size() >= n) {
- accumulator.remove();
+ if (val > accumulator.peek()) {
+ accumulator.remove();
+ accumulator.add(val);
+ }
+ } else {
+ accumulator.add(val);
}
- accumulator.add(val);
return accumulator;
}
public static List<Integer> result(PriorityQueue<Integer> accumulator) {