You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:07 UTC
[04/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
new file mode 100644
index 0000000..82dc184
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.tuple.Values;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class TestStormSql {
+ private static class MockDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mock";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ return new TestUtils.MockSqlTridentDataSource();
+ }
+ }
+
+ private static class MockNestedDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mocknested";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockNestedDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ return new TestUtils.MockSqlTridentDataSource();
+ }
+ }
+
+ private static class MockGroupDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mockgroup";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockGroupDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ return new TestUtils.MockSqlTridentGroupedDataSource();
+ }
+ }
+
+ private static class MockEmpDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mockemp";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockEmpDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ return new TestUtils.MockSqlTridentJoinDataSourceEmp();
+ }
+ }
+
+ private static class MockDeptDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mockdept";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockDeptDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ return new TestUtils.MockSqlTridentJoinDataSourceDept();
+ }
+ }
+
+
+ @BeforeClass
+ public static void setUp() {
+ DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+ DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
+ DataSourcesRegistry.providerMap().put("mockgroup", new MockGroupDataSourceProvider());
+ DataSourcesRegistry.providerMap().put("mockemp", new MockEmpDataSourceProvider());
+ DataSourcesRegistry.providerMap().put("mockdept", new MockDeptDataSourceProvider());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ DataSourcesRegistry.providerMap().remove("mock");
+ DataSourcesRegistry.providerMap().remove("mocknested");
+ }
+
+ @Test
+ public void testExternalDataSource() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(2, values.size());
+ Assert.assertEquals(4, values.get(0).get(0));
+ Assert.assertEquals(5, values.get(1).get(0));
+ }
+
+ @Test
+ public void testExternalDataSourceNested() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ System.out.println(values);
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+ }
+
+ @Test
+ public void testExternalNestedNonExistKeyAccess() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ // this triggers java.lang.RuntimeException: Cannot convert null to int
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
+
+ @Test
+ public void testExternalNestedNonExistKeyAccess2() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ // this triggers java.lang.RuntimeException: Cannot convert null to int
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
+
+ @Test
+ public void testExternalNestedInvalidAccessStringIndexOnArray() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
+
+ @Test
+ public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testExternalUdfType() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
+ stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
+ stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ System.out.println(values);
+
+ }
+
+ @Test(expected = CompilingClassLoader.CompilerException.class)
+ public void testExternalUdfType2() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ // generated code will be not compilable since return type of MYPLUS and type of 'x' are different
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
+ stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
+ stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
+
+ @Test
+ public void testExternalUdf() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
+ stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(2, values.size());
+ Assert.assertEquals(4, values.get(0).get(0));
+ Assert.assertEquals(5, values.get(1).get(0));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testExternalUdfUsingJar() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
+ stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ }
+
+ @Test
+ public void testGroupbyBuiltin() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(SALARY) FROM FOO GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(4, values.size());
+ Assert.assertEquals(3, values.get(0).get(2));
+ Assert.assertEquals(12, values.get(1).get(2));
+ Assert.assertEquals(21, values.get(2).get(2));
+ Assert.assertEquals(9, values.get(3).get(2));
+ }
+
+ @Test
+ public void testGroupbyBuiltinWithFilter() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals(1, values.get(0).get(0));
+ Assert.assertEquals(3L, values.get(0).get(1));
+ Assert.assertEquals(12, values.get(0).get(2));
+ Assert.assertEquals(2.5, values.get(0).get(3));
+ }
+
+ @Test
+ public void testGroupbyBuiltinAndUDF() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("CREATE FUNCTION MYCONCAT AS 'org.apache.storm.sql.TestUtils$MyConcat'");
+ stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
+ stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) FROM FOO GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(4, values.size());
+ Assert.assertEquals(3, values.get(0).get(1));
+ Assert.assertEquals("xxx", values.get(0).get(2));
+ Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
+ Assert.assertEquals(12, values.get(1).get(1));
+ Assert.assertEquals("xxx", values.get(1).get(2));
+ Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
+ Assert.assertEquals(21, values.get(2).get(1));
+ Assert.assertEquals("xxx", values.get(2).get(2));
+ Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
+ Assert.assertEquals(9, values.get(3).get(1));
+ Assert.assertEquals("x", values.get(3).get(2));
+ Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
+ }
+
+ @Test
+ public void testAggFnNonSqlReturnType() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
+ stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID >= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(4, values.size());
+ Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2));
+ Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2));
+ Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2));
+ Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2));
+ }
+
+ @Test
+ public void testGroupbySameAggregateOnDifferentColumns() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, COUNT(*), AVG(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals(1, values.get(0).get(0));
+ Assert.assertEquals(3L, values.get(0).get(1));
+ Assert.assertEquals(4, values.get(0).get(2));
+ Assert.assertEquals(2.5, values.get(0).get(3));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGroupbyBuiltinNotimplemented() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, COUNT(*), STDDEV_POP(SALARY) FROM FOO GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ }
+
+ @Test
+ public void testMinMax() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY (ID)");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(4, values.size());
+ Assert.assertEquals(0, values.get(0).get(2));
+ Assert.assertEquals(3, values.get(1).get(2));
+ Assert.assertEquals(6, values.get(2).get(2));
+ Assert.assertEquals(9, values.get(3).get(2));
+
+ Assert.assertEquals(1.5, values.get(0).get(3));
+ Assert.assertEquals(3.0, values.get(1).get(3));
+ Assert.assertEquals(4.5, values.get(2).get(3));
+ Assert.assertEquals(5.0, values.get(3).get(3));
+ }
+ @Test
+ public void testFilterGroupbyHaving() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM ID, MIN(SALARY) FROM FOO where ID > 0 GROUP BY (ID) HAVING ID > 2 AND MAX(SALARY) > 5");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals(3, values.get(0).get(0));
+ Assert.assertEquals(9, values.get(0).get(1));
+ }
+
+ @Test
+ public void testGroupByMultipleFields() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (DEPTID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR, EMPID INT) LOCATION 'mockgroup:///foo'");
+ stmt.add("SELECT STREAM DEPTID, EMPID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY DEPTID, EMPID");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(7, values.size());
+ Assert.assertEquals(0, values.get(0).get(0));
+ Assert.assertEquals(0, values.get(0).get(1));
+ Assert.assertEquals(2L, values.get(0).get(2));
+ }
+
+ @Test
+ public void testjoin() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
+ stmt.add("SELECT STREAM EMPID, EMPNAME, DEPTNAME FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ System.out.println(values);
+ Assert.assertEquals(3, values.size());
+ Assert.assertEquals("emp1", values.get(0).get(1));
+ Assert.assertEquals("dept1", values.get(0).get(2));
+ Assert.assertEquals("emp2", values.get(1).get(1));
+ Assert.assertEquals("dept1", values.get(1).get(2));
+ Assert.assertEquals("emp3", values.get(2).get(1));
+ Assert.assertEquals("dept2", values.get(2).get(2));
+ }
+
+ @Test
+ public void testjoinAndGroupby() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
+ stmt.add("SELECT STREAM d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0" +
+ "GROUP BY d.DEPTID");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(2, values.size());
+ Assert.assertEquals(1, values.get(0).get(0));
+ Assert.assertEquals(2L, values.get(0).get(1));
+ Assert.assertEquals(2, values.get(1).get(0));
+ Assert.assertEquals(1L, values.get(1).get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
new file mode 100644
index 0000000..634e454
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Function;
+import org.apache.storm.sql.compiler.backends.standalone.TestCompilerUtils;
+import org.apache.storm.tuple.Values;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestExprSemantic {
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ @Test
+ public void testLogicalExpr() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
+ "NOT (ID > 0 AND ID < 1)"));
+ assertEquals(new Values(true, false, true), v);
+ }
+
+ @Test
+ public void testExpectOperator() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
+ "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
+ "TRUE IS FALSE", "UNKNOWN IS NULL",
+ "UNKNOWN IS NOT NULL"));
+ assertEquals(new Values(true, false, false, true, false, true, false), v);
+ }
+
+ @Test
+ public void testDistinctBetweenLikeSimilarIn() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList("TRUE IS DISTINCT FROM TRUE",
+ "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5",
+ "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'",
+ "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO '[a-zA-Z]+[cd]{1}'",
+ "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', '2', '3', '4')",
+ "2 NOT IN (1, 3, 5)"));
+ assertEquals(new Values(false, false, true, true, true,
+ false, true, true, true, true), v);
+ }
+
+ @Test
+ public void testCaseStatement() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
+ "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn CONCAT('abcd', '#')} END",
+ "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
+ "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', '#')} END",
+ "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
+ "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', '#')} END"
+ )
+ );
+
+ // TODO: The data type of literal Calcite assigns seems to be out of expectation. Please see below logical plan.
+ // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), =('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), CAST('b'):CHAR(4) C
HARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 io}, id = 5
+ // LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4
+ // EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative cost = {0.0 rows, 1.0 cpu, 0.0 io}, id = 3
+ // in result, both 'b' and UPPER('a') hence 'A' are having some spaces which is not expected.
+ // When we use CASE with actual column (Java String type hence VARCHAR), it seems to work as expected.
+ // Please refer trident/TestPlanCompiler#testCaseStatement(), and see below logical plan.
+ // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, '#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 3
+ List<Object> v2 = Lists.transform(v, new Function<Object, Object>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Object o) {
+ return ((String) o).trim();
+ }
+ });
+ assertArrayEquals(new Values("abcd#", "b", "A").toArray(), v2.toArray());
+ }
+
+ @Test
+ public void testNullIfAndCoalesce() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 5, 4, NULL)", "COALESCE(1, 5)"
+ ));
+ assertEquals(new Values(null, 5, 5, 1), v);
+ }
+
+ @Test
+ public void testCollectionFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 5])"
+ ));
+ assertEquals(new Values(3, 5), v);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testElementFunctionMoreThanOneValue() throws Exception {
+ testExpr(
+ Lists.newArrayList(
+ "ELEMENT(ARRAY[1, 2, 3])"
+ ));
+ fail("ELEMENT with array which has multiple elements should throw exception in runtime.");
+ }
+
+ @Test
+ public void testArithmeticWithNull() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", "CAST(NULL AS INT) + CAST(NULL AS INT)", "1 + 2"
+ ));
+ assertEquals(new Values(null, null, null, 3), v);
+ }
+
+ @Test
+ public void testNotWithNull() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
+ ));
+ assertEquals(new Values(false, true, null), v);
+ }
+
+ @Test
+ public void testAndWithNull() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND UNKNOWN",
+ "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
+ "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND UNKNOWN"
+ ));
+ assertEquals(new Values(null, false, null, true, false, null, false,
+ false, false), v);
+ }
+
+ @Test
+ public void testAndWithNullable() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "ADDR = 'a' AND NAME = 'a'", "NAME = 'a' AND ADDR = 'a'", "NAME = 'x' AND ADDR = 'a'", "ADDR = 'a' AND NAME = 'x'"
+ ));
+ assertEquals(new Values(false, false, null, null), v);
+ }
+
+ @Test
+ public void testOrWithNullable() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "ADDR = 'a' OR NAME = 'a'", "NAME = 'a' OR ADDR = 'a' ", "NAME = 'x' OR ADDR = 'a' ", "ADDR = 'a' OR NAME = 'x'"
+ ));
+ assertEquals(new Values(null, null, true, true), v);
+ }
+
+ @Test
+ public void testOrWithNull() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR UNKNOWN",
+ "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
+ "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
+ ));
+ assertEquals(new Values(true, null, null, true, true, true, true,
+ false, null), v);
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
+ "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
+ ));
+ assertEquals(new Values(false, null, true, null, null, false,
+ true, null, false, null, null, true), v);
+ }
+
+ @Test
+ public void testArithmeticFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)",
+ "CEIL(123.45)", "FLOOR(123.45)"
+ ));
+
+ assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new BigDecimal(123)), v);
+
+ // Belows are floating numbers so comparing this with literal is tend to be failing...
+ // Picking int value and compare
+ Values v2 = testExpr(
+ Lists.newArrayList(
+ "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)"
+ ));
+ List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable Object o) {
+ // only takes int value
+ return ((Number) o).intValue();
+ }
+ });
+
+ // 15.9687, 2.7725, 4.0, 22026.465794
+ assertEquals(new Values(15, 2, 4, 22026), v2m);
+ }
+
+ @Test
+ public void testStringFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "'ab' || 'cd'", "CHAR_LENGTH('foo')", "CHARACTER_LENGTH('foo')",
+ "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')",
+ "TRIM(BOTH ' ' FROM ' abcdeabcdeabc ')",
+ "TRIM(LEADING ' ' FROM ' abcdeabcdeabc ')",
+ "TRIM(TRAILING ' ' FROM ' abcdeabcdeabc ')",
+ "OVERLAY('abcde' PLACING 'bc' FROM 3)",
+ "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' FROM 3 FOR 4)",
+ "INITCAP('foo')"
+ ));
+ assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", "abcdeabcdeabc ", " abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v);
+ }
+
+ @Test
+ public void testBinaryStringFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "x'45F0AB' || x'45F0AB'",
+ "POSITION(x'F0' IN x'453423F0ABBC')",
+ "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3)"
+ // "SUBSTRING(x'453423F0ABBC' FROM 3)",
+ // "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4)"
+ ));
+
+ // TODO: Calcite 1.9.0 has bugs on binary SUBSTRING functions
+ // as there's no SqlFunctions.substring(org.apache.calcite.avatica.util.ByteString, ...)
+ // commented out testing substring function
+
+ assertEquals("45f0ab45f0ab", v.get(0).toString());
+ assertEquals(4, v.get(1));
+ assertEquals("45344534abbc45", v.get(2).toString());
+ // assertEquals("23f0abbc", v.get(3).toString());
+ // assertEquals("23f0ab", v.get(4).toString());
+ }
+
+ @Test
+ public void testDateAndTimestampLiteral() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "DATE '1970-05-15' AS datefield",
+ "TIME '00:00:00' AS timefield",
+ "TIMESTAMP '2016-01-01 00:00:00' as timestampfield"
+ )
+ );
+
+ assertEquals(3, v.size());
+ assertEquals(134, v.get(0));
+ assertEquals(0, v.get(1));
+ assertEquals(1451606400000L, v.get(2));
+ }
+
+ @Test
+ public void testInterval() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "INTERVAL '1-5' YEAR TO MONTH AS intervalfield",
+ "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field"
+ )
+ );
+
+ assertEquals(3, v.size());
+ assertEquals(17, v.get(0));
+ assertEquals(0, v.get(1));
+ assertEquals(14, v.get(2));
+ }
+
+ @Test
+ public void testDateFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "LOCALTIME = CURRENT_TIME, LOCALTIMESTAMP = CURRENT_TIMESTAMP, CURRENT_DATE",
+ "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')",
+ "FLOOR(DATE '2016-01-23' TO MONTH)",
+ "CEIL(TIME '12:34:56' TO MINUTE)"
+ )
+ );
+
+ assertEquals(6, v.size());
+ assertTrue((boolean) v.get(0));
+ assertTrue((boolean) v.get(1));
+ // skip checking CURRENT_DATE since we don't inject dataContext so don't know about current timestamp
+ // we can do it from trident test
+ assertEquals(1L, v.get(3));
+ assertEquals(0L, v.get(4));
+ assertEquals(45300000, v.get(5));
+ }
+
+ @Test
+ public void testJDBCNumericFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 3)}", "{fn MOD(-10, 3)}"
+ ));
+
+ assertEquals(new Values(9.0d, 10, 1, -1), v);
+
+ // Belows are floating numbers so comparing this with literal is tend to be failing...
+ // Picking int value and compare
+ Values v2 = testExpr(
+ Lists.newArrayList(
+ "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}"
+ ));
+ List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable Object o) {
+ // only takes int value
+ return ((Number) o).intValue();
+ }
+ });
+
+ // 2.7725, 4.0, 22026.465794
+ assertEquals(new Values(2, 4, 22026), v2m);
+ }
+
+ @Test
+ public void testJDBCStringFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "{fn CONCAT('ab', 'cd')}",
+ "{fn LOCATE('bc', 'abcdeabcde')}",
+ //"{fn LOCATE('bc', 'abcdeabcde', 4)}",
+ "{fn INSERT('abcd', 2, 3, 'de')}",
+ "{fn LCASE('AbCdE')}",
+ "{fn LENGTH('AbCdE')}",
+ //"{fn LTRIM(' abcde ')}",
+ //"{fn RTRIM(' abcde ')}",
+ "{fn SUBSTRING('abcdeabcde', 3, 4)}",
+ "{fn UCASE('AbCdE')}"
+ )
+ );
+
+ // TODO: Calcite 1.9.0 doesn't support {fn LOCATE(string1, string2 [, integer])}
+ // while it's on support list on SQL reference
+ // and bugs on LTRIM and RTRIM : throwing AssertionError: Internal error: pre-condition failed: pos != null
+ // commented out problematic function tests
+
+ assertEquals(new Values("abcd", 2, "ade", "abcde", 5, "cdea", "ABCDE"), v);
+ }
+
+ @Test
+ public void testJDBCDateTimeFunctions() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "{fn CURDATE()} = CURRENT_DATE", "{fn CURTIME()} = LOCALTIME", "{fn NOW()} = LOCALTIMESTAMP",
+ "{fn QUARTER(DATE '2016-10-07')}", "{fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}",
+ "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}"
+ )
+ );
+
+ assertEquals(new Values(true, true, true, 4L, 1475799300000L, 86400), v);
+ }
+
+ private Values testExpr(List<String> exprs) throws Exception {
+ String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
+ " WHERE ID > 0 AND ID < 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ return values.get(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
new file mode 100644
index 0000000..8e64e9c
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.compiler.CompilerUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestCompilerUtils {
+
+ public static class MyPlus {
+ public static Integer eval(Integer x, Integer y) {
+ return x + y;
+ }
+ }
+
+ public static class MyStaticSumFunction {
+ public static long init() {
+ return 0L;
+ }
+ public static long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+ }
+
+ public static class MySumFunction {
+ public MySumFunction() {
+ }
+ public long init() {
+ return 0L;
+ }
+ public long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+ public long result(long accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static CalciteState sqlOverDummyTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ false,
+ Collections.<String>emptyList(), typeFactory));
+ SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+ FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+ schema).operatorTable(chainedSqlOperatorTable).build();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverNestedTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("MAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ .field("NESTEDMAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ , true))
+ .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
+ typeFactory.createArrayType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
+ , true))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ false,
+ Collections.<String>emptyList(), typeFactory));
+ SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+ FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+ schema).operatorTable(chainedSqlOperatorTable).build();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static class CalciteState {
+ final SchemaPlus schema;
+ final RelNode tree;
+
+ private CalciteState(SchemaPlus schema, RelNode tree) {
+ this.schema = schema;
+ this.tree = tree;
+ }
+
+ public SchemaPlus schema() {
+ return schema;
+ }
+
+ public RelNode tree() {
+ return tree;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
new file mode 100644
index 0000000..3226810
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ @Test
+ public void testCompile() throws Exception {
+ String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
+ values.toArray());
+ }
+
+ @Test
+ public void testLogicalExpr() throws Exception {
+ String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Assert.assertEquals(new Values(true, false, true), values.get(0));
+ }
+
+ @Test
+ public void testNested() throws Exception {
+ String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockNestedDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+ }
+
+ @Test
+ public void testUdf() throws Exception {
+ String sql = "SELECT MYPLUS(ID, 3)" +
+ "FROM FOO " +
+ "WHERE ID = 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Assert.assertEquals(new Values(5), values.get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
new file mode 100644
index 0000000..4bee9aa
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+public class TestRelNodeCompiler {
+ @Test
+ public void testFilter() throws Exception {
+ String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ LogicalProject project = (LogicalProject) state.tree();
+ LogicalFilter filter = (LogicalFilter) project.getInput();
+
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)
+ ) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ // standalone mode doesn't use inputstreams argument
+ compiler.visitFilter(filter, Collections.EMPTY_LIST);
+ pw.flush();
+ Assert.assertThat(sw.toString(), containsString("> 3"));
+ }
+
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)
+ ) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ // standalone mode doesn't use inputstreams argument
+ compiler.visitProject(project, Collections.EMPTY_LIST);
+ pw.flush();
+ Assert.assertThat(sw.toString(), containsString(" + 1"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
new file mode 100644
index 0000000..f6ef1ca
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.apache.storm.sql.planner.StormRelUtils;
+
+public class TestCompilerUtils {
+
+ public static class MyPlus {
+ public static Integer eval(Integer x, Integer y) {
+ return x + y;
+ }
+ }
+
+ public static class MyStaticSumFunction {
+ public static long init() {
+ return 0L;
+ }
+ public static long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+ }
+
+ public static class MySumFunction {
+ public MySumFunction() {
+ }
+ public long init() {
+ return 0L;
+ }
+ public long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+ public long result(long accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static CalciteState sqlOverDummyTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ TridentRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverDummyGroupByTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("GRPID", SqlTypeName.INTEGER)
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .field("AGE", SqlTypeName.INTEGER)
+ .field("SCORE", SqlTypeName.INTEGER)
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
+ schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ TridentRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverNestedTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("MAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ .field("NESTEDMAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ , true))
+ .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
+ typeFactory.createArrayType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
+ , true))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ TridentRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("EMPID", SqlTypeName.INTEGER)
+ .field("EMPNAME", SqlTypeName.VARCHAR)
+ .field("DEPTID", SqlTypeName.INTEGER)
+ .build();
+ Table table = streamableTable.stream();
+
+ StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("DEPTID", SqlTypeName.INTEGER)
+ .field("DEPTNAME", SqlTypeName.VARCHAR)
+ .build();
+ Table table2 = streamableTable2.stream();
+
+ schema.add("EMP", table);
+ schema.add("DEPT", table2);
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ TridentRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static class CalciteState {
+ final SchemaPlus schema;
+ final RelNode tree;
+
+ private CalciteState(SchemaPlus schema, RelNode tree) {
+ this.schema = schema;
+ this.tree = tree;
+ }
+
+ public SchemaPlus schema() {
+ return schema;
+ }
+
+ public RelNode tree() {
+ return tree;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
new file mode 100644
index 0000000..d3f30b9
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -0,0 +1,232 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.AbstractTridentProcessor;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+
+public class TestPlanCompiler {
+ private static LocalCluster cluster;
+
+ @BeforeClass
+ public static void staticSetup() throws Exception {
+ cluster = new LocalCluster();
+ }
+
+ @AfterClass
+ public static void staticCleanup() {
+ if (cluster!= null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Before
+ public void setUp() {
+ getCollectedValues().clear();
+ }
+
+ @Test
+ public void testCompile() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 2;
+ String sql = "SELECT ID FROM FOO WHERE ID > 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final TridentTopology topo = proc.build();
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
+ f, new TestUtils.MockStateUpdater(), new Fields());
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 1;
+ String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+ data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final TridentTopology topo = proc.build();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testUdf() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT MYPLUS(ID, 3)" +
+ "FROM FOO " +
+ "WHERE ID = 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final TridentTopology topo = proc.build();
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
+ f, new TestUtils.MockStateUpdater(), new Fields());
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testCaseStatement() throws Exception {
+ int EXPECTED_VALUE_SIZE = 5;
+ String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
+ "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END FROM FOO";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final TridentTopology topo = proc.build();
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), new Values("A"), new Values("abcd#"), new Values("A")}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testNested() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final TridentTopology topo = proc.build();
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testDateKeywords() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT " +
+ "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE " +
+ "FROM FOO " +
+ "WHERE ID > 0 AND ID < 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractTridentProcessor proc = planner.compile(data, sql);
+ final DataContext dataContext = proc.getDataContext();
+ final TridentTopology topo = proc.build();
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
+ long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
+ long localTimestamp = (long) dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName);
+
+ System.out.println(getCollectedValues());
+
+ java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp);
+ int dateInt = (int) timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay();
+ int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+ int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+
+ Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, getCollectedValues().toArray());
+ }
+
+ private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
+ TridentTopology topo) throws Exception {
+ final Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+
+ if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
+ CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
+ Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
+ }
+
+ try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
+ waitForCompletion(1000 * 1000, new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return getCollectedValues().size() < expectedValueSize;
+ }
+ });
+ } finally {
+ while(cluster.getClusterInfo().get_topologies_size() > 0) {
+ Thread.sleep(10);
+ }
+ Utils.resetClassLoaderForJavaDeSerialize();
+ }
+ }
+
+ private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
+ long start = TestUtils.monotonicNow();
+ while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
+ Thread.sleep(100);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
new file mode 100644
index 0000000..68054d8
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.storm.sql.parser.impl.ParseException;
+import org.junit.Test;
+
+public class TestSqlParser {
+ @Test
+ public void testCreateTable() throws Exception {
+ String sql = "CREATE EXTERNAL TABLE foo (bar INT) LOCATION 'kafka:///foo'";
+ parse(sql);
+ }
+
+ @Test
+ public void testCreateTableWithPrimaryKey() throws Exception {
+ String sql = "CREATE EXTERNAL TABLE foo (bar INT PRIMARY KEY ASC) LOCATION 'kafka:///foo'";
+ parse(sql);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testCreateTableWithoutLocation() throws Exception {
+ String sql = "CREATE EXTERNAL TABLE foo (bar INT)";
+ parse(sql);
+ }
+
+ @Test
+ public void testCreateFunction() throws Exception {
+ String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'";
+ parse(sql);
+ }
+
+ private static SqlNode parse(String sql) throws Exception {
+ StormParser parser = new StormParser(sql);
+ return parser.impl().parseSqlStmtEof();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/pom.xml b/sql/storm-sql-external/storm-sql-hdfs/pom.xml
new file mode 100644
index 0000000..9275fbb
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-hdfs</artifactId>
+
+ <developers>
+ <developer>
+ <id>vesense</id>
+ <name>Xin Wang</name>
+ <email>data.xinwang@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+ see: http://stackoverflow.com/q/20469026/3542091 -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>