You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/25 12:21:34 UTC

[1/4] storm git commit: [STORM-2118] A few fixes for storm-sql standalone mode

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 68bffe37d -> 8462725be


[STORM-2118] A few fixes for storm-sql standalone mode

1. Cast the result, accumulator and value types correctly
2. Support aggregate functions with more than one argument


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/30016384
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/30016384
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/30016384

Branch: refs/heads/1.x-branch
Commit: 3001638431f29cd9243e4b9016692b6cec505bca
Parents: 68bffe3
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Sep 15 15:42:25 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:19:39 2016 +0900

----------------------------------------------------------------------
 .../backends/standalone/RelNodeCompiler.java    | 39 ++++++++------------
 .../test/org/apache/storm/sql/TestStormSql.java |  7 +++-
 .../test/org/apache/storm/sql/TestUtils.java    | 21 +++++++++++
 3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 9df0496..c6adb28 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -320,22 +320,19 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
 
   private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
     String resultName = varName + "_result";
+    Class<?> accumulatorType = aggFn.accumulatorType;
+    Class<?> resultType = aggFn.resultType;
     List<String> args = new ArrayList<>();
     if (!aggFn.isStatic) {
       String aggObjName = String.format("%s_obj", varName);
-      String aggObjClassName = (aggFn.initMethod.getDeclaringClass().getCanonicalName());
-      boolean genericType = aggFn.initMethod.getDeclaringClass().getTypeParameters().length > 0;
-      if (genericType) {
-        pw.println("          @SuppressWarnings(\"unchecked\")");
-        pw.print(String.format("          final %1$s<%3$s> %2$s = (%1$s<%3$s>) accumulators.get(\"%2$s\");", aggObjClassName,
-                                 aggObjName, Primitives.wrap((Class<?>) ty).getCanonicalName()));
-      } else {
-        pw.print(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, aggObjName));
-      }
+      String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+      pw.println("          @SuppressWarnings(\"unchecked\")");
+      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+              aggObjName));
       args.add(aggObjName);
     }
-    args.add(String.format("(%s)accumulators.get(\"%s\")", ((Class<?>) ty).getCanonicalName(), varName));
-    pw.print(String.format("          final %s %s = %s;", ((Class<?>) ty).getCanonicalName(),
+    args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
+    pw.println(String.format("          final %s %s = %s;", resultType.getCanonicalName(),
                              resultName, ExprCompiler.printMethodCall(aggFn.resultMethod, args)));
 
     return resultName;
@@ -350,8 +347,6 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
         if (call.getArgList().size() != 0) {
           throw new UnsupportedOperationException("Count with nullable fields");
         }
-      } else {
-        throw new IllegalArgumentException("Aggregate call should have one argument");
       }
     }
     if (aggFunction instanceof SqlUserDefinedAggFunction) {
@@ -378,30 +373,28 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
 
   private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
     List<String> args = new ArrayList<>();
+    Class<?> accumulatorType = aggFn.accumulatorType;
     if (!aggFn.isStatic) {
       String aggObjName = String.format("%s_obj", varName);
       String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
       pw.println(String.format("          if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
       pw.println(String.format("            accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
       pw.println("          }");
-      boolean genericType = aggFn.initMethod.getDeclaringClass().getTypeParameters().length > 0;
-      if (genericType) {
-        pw.println("          @SuppressWarnings(\"unchecked\")");
-        pw.println(String.format("          final %1$s<%3$s> %2$s = (%1$s<%3$s>) accumulators.get(\"%2$s\");", aggObjClassName,
-                                 aggObjName, Primitives.wrap((Class<?>) ty).getCanonicalName()));
-      } else {
-        pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, aggObjName));
-      }
+      pw.println("          @SuppressWarnings(\"unchecked\")");
+      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+              aggObjName));
       args.add(aggObjName);
     }
     args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
                            "accumulators.get(\"" + varName + "\")",
                            ExprCompiler.printMethodCall(aggFn.initMethod, args),
-                           Primitives.wrap((Class<?>) ty).getCanonicalName()));
+                           accumulatorType.getCanonicalName()));
     if (argList.isEmpty()) {
       args.add("EMPTY_VALUES");
     } else {
-      args.add(String.format("(%s) %s", ((Class<?>) ty).getCanonicalName(), "_data.get(" + argList.get(0) + ")"));
+      for (int i = 0; i < aggFn.valueTypes.size(); i++) {
+        args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
+      }
     }
     pw.print(String.format("          accumulators.put(\"%s\", %s);\n",
                            varName,

http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 7b8554e..025219b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -292,7 +292,8 @@ public class TestStormSql {
     List<String> stmt = new ArrayList<>();
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
     stmt.add("CREATE FUNCTION MYCONCAT AS 'org.apache.storm.sql.TestUtils$MyConcat'");
-    stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME) FROM FOO GROUP BY (ID)");
+    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
+    stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) FROM FOO GROUP BY (ID)");
     StormSql sql = StormSql.construct();
     List<Values> values = new ArrayList<>();
     ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
@@ -300,12 +301,16 @@ public class TestStormSql {
     Assert.assertEquals(4, values.size());
     Assert.assertEquals(3, values.get(0).get(1));
     Assert.assertEquals("xxx", values.get(0).get(2));
+    Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
     Assert.assertEquals(12, values.get(1).get(1));
     Assert.assertEquals("xxx", values.get(1).get(2));
+    Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
     Assert.assertEquals(21, values.get(2).get(1));
     Assert.assertEquals("xxx", values.get(2).get(2));
+    Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
     Assert.assertEquals(9, values.get(3).get(1));
     Assert.assertEquals("x", values.get(3).get(2));
+    Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/30016384/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 5d0384a..9eceaf5 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -36,9 +36,11 @@ import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 
 public class TestUtils {
   public static class MyPlus {
@@ -60,6 +62,25 @@ public class TestUtils {
   }
 
 
+  public static class TopN {
+    public static PriorityQueue<Integer> init() {
+      return new PriorityQueue<>();
+    }
+    public static PriorityQueue<Integer> add(PriorityQueue<Integer> accumulator, Integer n, Integer val) {
+      if (accumulator.size() >= n) {
+        accumulator.remove();
+      }
+      accumulator.add(val);
+      return accumulator;
+    }
+    public static List<Integer> result(PriorityQueue<Integer> accumulator) {
+      List<Integer> res = new ArrayList<>(accumulator);
+      Collections.reverse(res);
+      return res;
+    }
+  }
+
+
   public static class MockDataSource implements DataSource {
     private final ArrayList<Values> RECORDS = new ArrayList<>();
 


[4/4] storm git commit: add STORM-2118 to CHANGELOG

Posted by ka...@apache.org.
add STORM-2118 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8462725b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8462725b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8462725b

Branch: refs/heads/1.x-branch
Commit: 8462725bec2f9ad16f1c36ec2f006b725c0b9e1a
Parents: 8ada12f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Sep 25 21:20:21 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:20:21 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8462725b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index be29931..f7091c9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -88,6 +88,7 @@
  * STORM-1720: Support GEO in storm-redis
 
 ## 1.0.3
+ * STORM-2118: A few fixes for storm-sql standalone mode
  * STORM-2119: bug in log message printing to stdout
  * STORM-2120: Emit to _spoutConfig.outputStreamId
  * STORM-2101: fixes npe in compute-executors in nimbus


[3/4] storm git commit: Merge branch 'STORM-2118-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2118-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ada12fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ada12fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ada12fd

Branch: refs/heads/1.x-branch
Commit: 8ada12fd3391a5c9b06aa10007e46416123cbdb2
Parents: 68bffe3 324a9dd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Sep 25 21:20:00 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:20:00 2016 +0900

----------------------------------------------------------------------
 .../backends/standalone/RelNodeCompiler.java    | 39 ++++++++------------
 .../test/org/apache/storm/sql/TestStormSql.java |  7 +++-
 .../test/org/apache/storm/sql/TestUtils.java    | 28 ++++++++++++++
 3 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: [STORM-2118] address review comments

Posted by ka...@apache.org.
[STORM-2118] address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/324a9dd3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/324a9dd3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/324a9dd3

Branch: refs/heads/1.x-branch
Commit: 324a9dd36a80eb26a51214bba086369f3debf47d
Parents: 3001638
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Sep 23 11:27:45 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Sep 25 21:19:43 2016 +0900

----------------------------------------------------------------------
 .../src/test/org/apache/storm/sql/TestUtils.java         | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/324a9dd3/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 9eceaf5..58c3f26 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -67,10 +67,17 @@ public class TestUtils {
       return new PriorityQueue<>();
     }
     public static PriorityQueue<Integer> add(PriorityQueue<Integer> accumulator, Integer n, Integer val) {
+      if (n <= 0) {
+        return accumulator;
+      }
       if (accumulator.size() >= n) {
-        accumulator.remove();
+        if (val > accumulator.peek()) {
+          accumulator.remove();
+          accumulator.add(val);
+        }
+      } else {
+        accumulator.add(val);
       }
-      accumulator.add(val);
       return accumulator;
     }
     public static List<Integer> result(PriorityQueue<Integer> accumulator) {