You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:17 UTC
[14/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
deleted file mode 100644
index 634e454..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.base.Function;
-import org.apache.storm.sql.compiler.backends.standalone.TestCompilerUtils;
-import org.apache.storm.tuple.Values;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestExprSemantic {
- private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- @Test
- public void testLogicalExpr() throws Exception {
- Values v = testExpr(
- Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
- "NOT (ID > 0 AND ID < 1)"));
- assertEquals(new Values(true, false, true), v);
- }
-
- @Test
- public void testExpectOperator() throws Exception {
- Values v = testExpr(
- Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
- "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
- "TRUE IS FALSE", "UNKNOWN IS NULL",
- "UNKNOWN IS NOT NULL"));
- assertEquals(new Values(true, false, false, true, false, true, false), v);
- }
-
- @Test
- public void testDistinctBetweenLikeSimilarIn() throws Exception {
- Values v = testExpr(
- Lists.newArrayList("TRUE IS DISTINCT FROM TRUE",
- "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5",
- "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'",
- "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO '[a-zA-Z]+[cd]{1}'",
- "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', '2', '3', '4')",
- "2 NOT IN (1, 3, 5)"));
- assertEquals(new Values(false, false, true, true, true,
- false, true, true, true, true), v);
- }
-
- @Test
- public void testCaseStatement() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
- "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn CONCAT('abcd', '#')} END",
- "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
- "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', '#')} END",
- "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
- "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', '#')} END"
- )
- );
-
- // TODO: The data type of literal Calcite assigns seems to be out of expectation. Please see below logical plan.
- // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), =('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), CAST('b'):CHAR(4) C
HARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, ||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 io}, id = 5
- // LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4
- // EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative cost = {0.0 rows, 1.0 cpu, 0.0 io}, id = 3
- // in result, both 'b' and UPPER('a') hence 'A' are having some spaces which is not expected.
- // When we use CASE with actual column (Java String type hence VARCHAR), it seems to work as expected.
- // Please refer trident/TestPlanCompiler#testCaseStatement(), and see below logical plan.
- // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, '#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 3
- List<Object> v2 = Lists.transform(v, new Function<Object, Object>() {
- @Nullable
- @Override
- public String apply(@Nullable Object o) {
- return ((String) o).trim();
- }
- });
- assertArrayEquals(new Values("abcd#", "b", "A").toArray(), v2.toArray());
- }
-
- @Test
- public void testNullIfAndCoalesce() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 5, 4, NULL)", "COALESCE(1, 5)"
- ));
- assertEquals(new Values(null, 5, 5, 1), v);
- }
-
- @Test
- public void testCollectionFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 5])"
- ));
- assertEquals(new Values(3, 5), v);
- }
-
- @Test(expected = RuntimeException.class)
- public void testElementFunctionMoreThanOneValue() throws Exception {
- testExpr(
- Lists.newArrayList(
- "ELEMENT(ARRAY[1, 2, 3])"
- ));
- fail("ELEMENT with array which has multiple elements should throw exception in runtime.");
- }
-
- @Test
- public void testArithmeticWithNull() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", "CAST(NULL AS INT) + CAST(NULL AS INT)", "1 + 2"
- ));
- assertEquals(new Values(null, null, null, 3), v);
- }
-
- @Test
- public void testNotWithNull() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
- ));
- assertEquals(new Values(false, true, null), v);
- }
-
- @Test
- public void testAndWithNull() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND UNKNOWN",
- "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
- "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND UNKNOWN"
- ));
- assertEquals(new Values(null, false, null, true, false, null, false,
- false, false), v);
- }
-
- @Test
- public void testAndWithNullable() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "ADDR = 'a' AND NAME = 'a'", "NAME = 'a' AND ADDR = 'a'", "NAME = 'x' AND ADDR = 'a'", "ADDR = 'a' AND NAME = 'x'"
- ));
- assertEquals(new Values(false, false, null, null), v);
- }
-
- @Test
- public void testOrWithNullable() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "ADDR = 'a' OR NAME = 'a'", "NAME = 'a' OR ADDR = 'a' ", "NAME = 'x' OR ADDR = 'a' ", "ADDR = 'a' OR NAME = 'x'"
- ));
- assertEquals(new Values(null, null, true, true), v);
- }
-
- @Test
- public void testOrWithNull() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR UNKNOWN",
- "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
- "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
- ));
- assertEquals(new Values(true, null, null, true, true, true, true,
- false, null), v);
- }
-
- @Test
- public void testEquals() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
- "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
- ));
- assertEquals(new Values(false, null, true, null, null, false,
- true, null, false, null, null, true), v);
- }
-
- @Test
- public void testArithmeticFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)",
- "CEIL(123.45)", "FLOOR(123.45)"
- ));
-
- assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new BigDecimal(123)), v);
-
- // Belows are floating numbers so comparing this with literal is tend to be failing...
- // Picking int value and compare
- Values v2 = testExpr(
- Lists.newArrayList(
- "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)"
- ));
- List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
- @Nullable
- @Override
- public Object apply(@Nullable Object o) {
- // only takes int value
- return ((Number) o).intValue();
- }
- });
-
- // 15.9687, 2.7725, 4.0, 22026.465794
- assertEquals(new Values(15, 2, 4, 22026), v2m);
- }
-
- @Test
- public void testStringFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "'ab' || 'cd'", "CHAR_LENGTH('foo')", "CHARACTER_LENGTH('foo')",
- "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')",
- "TRIM(BOTH ' ' FROM ' abcdeabcdeabc ')",
- "TRIM(LEADING ' ' FROM ' abcdeabcdeabc ')",
- "TRIM(TRAILING ' ' FROM ' abcdeabcdeabc ')",
- "OVERLAY('abcde' PLACING 'bc' FROM 3)",
- "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' FROM 3 FOR 4)",
- "INITCAP('foo')"
- ));
- assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", "abcdeabcdeabc ", " abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v);
- }
-
- @Test
- public void testBinaryStringFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "x'45F0AB' || x'45F0AB'",
- "POSITION(x'F0' IN x'453423F0ABBC')",
- "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3)"
- // "SUBSTRING(x'453423F0ABBC' FROM 3)",
- // "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4)"
- ));
-
- // TODO: Calcite 1.9.0 has bugs on binary SUBSTRING functions
- // as there's no SqlFunctions.substring(org.apache.calcite.avatica.util.ByteString, ...)
- // commented out testing substring function
-
- assertEquals("45f0ab45f0ab", v.get(0).toString());
- assertEquals(4, v.get(1));
- assertEquals("45344534abbc45", v.get(2).toString());
- // assertEquals("23f0abbc", v.get(3).toString());
- // assertEquals("23f0ab", v.get(4).toString());
- }
-
- @Test
- public void testDateAndTimestampLiteral() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "DATE '1970-05-15' AS datefield",
- "TIME '00:00:00' AS timefield",
- "TIMESTAMP '2016-01-01 00:00:00' as timestampfield"
- )
- );
-
- assertEquals(3, v.size());
- assertEquals(134, v.get(0));
- assertEquals(0, v.get(1));
- assertEquals(1451606400000L, v.get(2));
- }
-
- @Test
- public void testInterval() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "INTERVAL '1-5' YEAR TO MONTH AS intervalfield",
- "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field"
- )
- );
-
- assertEquals(3, v.size());
- assertEquals(17, v.get(0));
- assertEquals(0, v.get(1));
- assertEquals(14, v.get(2));
- }
-
- @Test
- public void testDateFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "LOCALTIME = CURRENT_TIME, LOCALTIMESTAMP = CURRENT_TIMESTAMP, CURRENT_DATE",
- "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')",
- "FLOOR(DATE '2016-01-23' TO MONTH)",
- "CEIL(TIME '12:34:56' TO MINUTE)"
- )
- );
-
- assertEquals(6, v.size());
- assertTrue((boolean) v.get(0));
- assertTrue((boolean) v.get(1));
- // skip checking CURRENT_DATE since we don't inject dataContext so don't know about current timestamp
- // we can do it from trident test
- assertEquals(1L, v.get(3));
- assertEquals(0L, v.get(4));
- assertEquals(45300000, v.get(5));
- }
-
- @Test
- public void testJDBCNumericFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 3)}", "{fn MOD(-10, 3)}"
- ));
-
- assertEquals(new Values(9.0d, 10, 1, -1), v);
-
- // Belows are floating numbers so comparing this with literal is tend to be failing...
- // Picking int value and compare
- Values v2 = testExpr(
- Lists.newArrayList(
- "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}"
- ));
- List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
- @Nullable
- @Override
- public Object apply(@Nullable Object o) {
- // only takes int value
- return ((Number) o).intValue();
- }
- });
-
- // 2.7725, 4.0, 22026.465794
- assertEquals(new Values(2, 4, 22026), v2m);
- }
-
- @Test
- public void testJDBCStringFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "{fn CONCAT('ab', 'cd')}",
- "{fn LOCATE('bc', 'abcdeabcde')}",
- //"{fn LOCATE('bc', 'abcdeabcde', 4)}",
- "{fn INSERT('abcd', 2, 3, 'de')}",
- "{fn LCASE('AbCdE')}",
- "{fn LENGTH('AbCdE')}",
- //"{fn LTRIM(' abcde ')}",
- //"{fn RTRIM(' abcde ')}",
- "{fn SUBSTRING('abcdeabcde', 3, 4)}",
- "{fn UCASE('AbCdE')}"
- )
- );
-
- // TODO: Calcite 1.9.0 doesn't support {fn LOCATE(string1, string2 [, integer])}
- // while it's on support list on SQL reference
- // and bugs on LTRIM and RTRIM : throwing AssertionError: Internal error: pre-condition failed: pos != null
- // commented out problematic function tests
-
- assertEquals(new Values("abcd", 2, "ade", "abcde", 5, "cdea", "ABCDE"), v);
- }
-
- @Test
- public void testJDBCDateTimeFunctions() throws Exception {
- Values v = testExpr(
- Lists.newArrayList(
- "{fn CURDATE()} = CURRENT_DATE", "{fn CURTIME()} = LOCALTIME", "{fn NOW()} = LOCALTIMESTAMP",
- "{fn QUARTER(DATE '2016-10-07')}", "{fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}",
- "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}"
- )
- );
-
- assertEquals(new Values(true, true, true, 4L, 1475799300000L, 86400), v);
- }
-
- private Values testExpr(List<String> exprs) throws Exception {
- String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
- " WHERE ID > 0 AND ID < 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree());
- Map<String, DataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockDataSource());
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- proc.initialize(data, h);
- return values.get(0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
deleted file mode 100644
index 8e64e9c..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.compiler.CompilerUtil;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestCompilerUtils {
-
- public static class MyPlus {
- public static Integer eval(Integer x, Integer y) {
- return x + y;
- }
- }
-
- public static class MyStaticSumFunction {
- public static long init() {
- return 0L;
- }
- public static long add(long accumulator, int v) {
- return accumulator + v;
- }
- }
-
- public static class MySumFunction {
- public MySumFunction() {
- }
- public long init() {
- return 0L;
- }
- public long add(long accumulator, int v) {
- return accumulator + v;
- }
- public long result(long accumulator) {
- return accumulator;
- }
- }
-
- public static CalciteState sqlOverDummyTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("NAME", typeFactory.createType(String.class))
- .field("ADDR", typeFactory.createType(String.class))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
- false,
- Collections.<String>emptyList(), typeFactory));
- SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
- FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
- schema).operatorTable(chainedSqlOperatorTable).build();
- Planner planner = Frameworks.getPlanner(config);
- SqlNode parse = planner.parse(sql);
- SqlNode validate = planner.validate(parse);
- RelNode tree = planner.convert(validate);
- System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverNestedTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
-
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("MAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- .field("NESTEDMAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- , true))
- .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
- typeFactory.createArrayType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
- , true))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
- false,
- Collections.<String>emptyList(), typeFactory));
- SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
- FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
- schema).operatorTable(chainedSqlOperatorTable).build();
- Planner planner = Frameworks.getPlanner(config);
- SqlNode parse = planner.parse(sql);
- SqlNode validate = planner.validate(parse);
- RelNode tree = planner.convert(validate);
- System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static class CalciteState {
- final SchemaPlus schema;
- final RelNode tree;
-
- private CalciteState(SchemaPlus schema, RelNode tree) {
- this.schema = schema;
- this.tree = tree;
- }
-
- public SchemaPlus schema() {
- return schema;
- }
-
- public RelNode tree() {
- return tree;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
deleted file mode 100644
index 3226810..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.storm.tuple.Values;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestPlanCompiler {
- private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- @Test
- public void testCompile() throws Exception {
- String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree());
- Map<String, DataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockDataSource());
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- proc.initialize(data, h);
- Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
- values.toArray());
- }
-
- @Test
- public void testLogicalExpr() throws Exception {
- String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree());
- Map<String, DataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockDataSource());
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- proc.initialize(data, h);
- Assert.assertEquals(new Values(true, false, true), values.get(0));
- }
-
- @Test
- public void testNested() throws Exception {
- String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree());
- Map<String, DataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockNestedDataSource());
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- proc.initialize(data, h);
- Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
- Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
- Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
- }
-
- @Test
- public void testUdf() throws Exception {
- String sql = "SELECT MYPLUS(ID, 3)" +
- "FROM FOO " +
- "WHERE ID = 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree());
- Map<String, DataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockDataSource());
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- proc.initialize(data, h);
- Assert.assertEquals(new Values(5), values.get(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
deleted file mode 100644
index 4bee9aa..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.containsString;
-
-public class TestRelNodeCompiler {
- @Test
- public void testFilter() throws Exception {
- String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
- LogicalProject project = (LogicalProject) state.tree();
- LogicalFilter filter = (LogicalFilter) project.getInput();
-
- try (StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw)
- ) {
- RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
- // standalone mode doesn't use inputstreams argument
- compiler.visitFilter(filter, Collections.EMPTY_LIST);
- pw.flush();
- Assert.assertThat(sw.toString(), containsString("> 3"));
- }
-
- try (StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw)
- ) {
- RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
- // standalone mode doesn't use inputstreams argument
- compiler.visitProject(project, Collections.EMPTY_LIST);
- pw.flush();
- Assert.assertThat(sw.toString(), containsString(" + 1"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
deleted file mode 100644
index f6ef1ca..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.trident;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.apache.storm.sql.planner.StormRelUtils;
-
-public class TestCompilerUtils {
-
- public static class MyPlus {
- public static Integer eval(Integer x, Integer y) {
- return x + y;
- }
- }
-
- public static class MyStaticSumFunction {
- public static long init() {
- return 0L;
- }
- public static long add(long accumulator, int v) {
- return accumulator + v;
- }
- }
-
- public static class MySumFunction {
- public MySumFunction() {
- }
- public long init() {
- return 0L;
- }
- public long add(long accumulator, int v) {
- return accumulator + v;
- }
- public long result(long accumulator) {
- return accumulator;
- }
- }
-
- public static CalciteState sqlOverDummyTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("NAME", typeFactory.createType(String.class))
- .field("ADDR", typeFactory.createType(String.class))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverDummyGroupByTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("GRPID", SqlTypeName.INTEGER)
- .field("NAME", typeFactory.createType(String.class))
- .field("ADDR", typeFactory.createType(String.class))
- .field("AGE", SqlTypeName.INTEGER)
- .field("SCORE", SqlTypeName.INTEGER)
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
- schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverNestedTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
-
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("MAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- .field("NESTEDMAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- , true))
- .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
- typeFactory.createArrayType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
- , true))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
-
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("EMPID", SqlTypeName.INTEGER)
- .field("EMPNAME", SqlTypeName.VARCHAR)
- .field("DEPTID", SqlTypeName.INTEGER)
- .build();
- Table table = streamableTable.stream();
-
- StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("DEPTID", SqlTypeName.INTEGER)
- .field("DEPTNAME", SqlTypeName.VARCHAR)
- .build();
- Table table2 = streamableTable2.stream();
-
- schema.add("EMP", table);
- schema.add("DEPT", table2);
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static class CalciteState {
- final SchemaPlus schema;
- final RelNode tree;
-
- private CalciteState(SchemaPlus schema, RelNode tree) {
- this.schema = schema;
- this.tree = tree;
- }
-
- public SchemaPlus schema() {
- return schema;
- }
-
- public RelNode tree() {
- return tree;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
deleted file mode 100644
index 9ba7267..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * * <p>
- * * http://www.apache.org/licenses/LICENSE-2.0
- * * <p>
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.storm.sql.compiler.backends.trident;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.AbstractTridentProcessor;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.time.ZoneOffset;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
-
-public class TestPlanCompiler {
- private static LocalCluster cluster;
-
- @BeforeClass
- public static void staticSetup() throws Exception {
- cluster = new LocalCluster();
- }
-
- @AfterClass
- public static void staticCleanup() {
- if (cluster!= null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- @Before
- public void setUp() {
- getCollectedValues().clear();
- }
-
- @Test
- public void testCompile() throws Exception {
- final int EXPECTED_VALUE_SIZE = 2;
- String sql = "SELECT ID FROM FOO WHERE ID > 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- final Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentDataSource());
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final TridentTopology topo = proc.build();
- Fields f = proc.outputStream().getOutputFields();
- proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
- f, new TestUtils.MockStateUpdater(), new Fields());
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
- }
-
- @Test
- public void testInsert() throws Exception {
- final int EXPECTED_VALUE_SIZE = 1;
- String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- final Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentDataSource());
- data.put("BAR", new TestUtils.MockSqlTridentDataSource());
-
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final TridentTopology topo = proc.build();
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, getCollectedValues().toArray());
- }
-
- @Test
- public void testUdf() throws Exception {
- int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT MYPLUS(ID, 3)" +
- "FROM FOO " +
- "WHERE ID = 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
- Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentDataSource());
-
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final TridentTopology topo = proc.build();
- Fields f = proc.outputStream().getOutputFields();
- proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
- f, new TestUtils.MockStateUpdater(), new Fields());
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray());
- }
-
- @Test
- public void testCaseStatement() throws Exception {
- int EXPECTED_VALUE_SIZE = 5;
- String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN UPPER('a') " +
- "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END FROM FOO";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-
- final Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentDataSource());
-
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final TridentTopology topo = proc.build();
- Fields f = proc.outputStream().getOutputFields();
- proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-
- Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), new Values("A"), new Values("abcd#"), new Values("A")}, getCollectedValues().toArray());
- }
-
- @Test
- public void testNested() throws Exception {
- int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
-
- final Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource());
-
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final TridentTopology topo = proc.build();
- Fields f = proc.outputStream().getOutputFields();
- proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-
- Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
- Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
- Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))}, getCollectedValues().toArray());
- }
-
- @Test
- public void testDateKeywords() throws Exception {
- int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT " +
- "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE " +
- "FROM FOO " +
- "WHERE ID > 0 AND ID < 2";
- TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-
- final Map<String, ISqlTridentDataSource> data = new HashMap<>();
- data.put("FOO", new TestUtils.MockSqlTridentDataSource());
- QueryPlanner planner = new QueryPlanner(state.schema());
- AbstractTridentProcessor proc = planner.compile(data, sql);
- final DataContext dataContext = proc.getDataContext();
- final TridentTopology topo = proc.build();
- Fields f = proc.outputStream().getOutputFields();
- proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields());
- runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-
- long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
- long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
- long localTimestamp = (long) dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName);
-
- System.out.println(getCollectedValues());
-
- java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp);
- int dateInt = (int) timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay();
- int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
- int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
-
- Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, getCollectedValues().toArray());
- }
-
- private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
- TridentTopology topo) throws Exception {
- final Config conf = new Config();
- conf.setMaxSpoutPending(20);
-
- if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
- CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
- Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
- }
-
- try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
- waitForCompletion(1000 * 1000, new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return getCollectedValues().size() < expectedValueSize;
- }
- });
- } finally {
- while(cluster.getClusterInfo().get_topologies_size() > 0) {
- Thread.sleep(10);
- }
- Utils.resetClassLoaderForJavaDeSerialize();
- }
- }
-
- private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
- long start = TestUtils.monotonicNow();
- while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
- Thread.sleep(100);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
deleted file mode 100644
index 68054d8..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlNode;
-import org.apache.storm.sql.parser.impl.ParseException;
-import org.junit.Test;
-
-public class TestSqlParser {
- @Test
- public void testCreateTable() throws Exception {
- String sql = "CREATE EXTERNAL TABLE foo (bar INT) LOCATION 'kafka:///foo'";
- parse(sql);
- }
-
- @Test
- public void testCreateTableWithPrimaryKey() throws Exception {
- String sql = "CREATE EXTERNAL TABLE foo (bar INT PRIMARY KEY ASC) LOCATION 'kafka:///foo'";
- parse(sql);
- }
-
- @Test(expected = ParseException.class)
- public void testCreateTableWithoutLocation() throws Exception {
- String sql = "CREATE EXTERNAL TABLE foo (bar INT)";
- parse(sql);
- }
-
- @Test
- public void testCreateFunction() throws Exception {
- String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'";
- parse(sql);
- }
-
- private static SqlNode parse(String sql) throws Exception {
- StormParser parser = new StormParser(sql);
- return parser.impl().parseSqlStmtEof();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml b/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml
deleted file mode 100644
index 9e0a599..0000000
--- a/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-hdfs</artifactId>
-
- <developers>
- <developer>
- <id>vesense</id>
- <name>Xin Wang</name>
- <email>data.xinwang@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- <exclusions>
- <!--log4j-over-slf4j must be excluded for hadoop-minicluster
- see: http://stackoverflow.com/q/20469026/3542091 -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-hdfs</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
deleted file mode 100644
index 38c3fcb..0000000
--- a/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.hdfs;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.hdfs.trident.HdfsState;
-import org.apache.storm.hdfs.trident.HdfsStateFactory;
-import org.apache.storm.hdfs.trident.HdfsUpdater;
-import org.apache.storm.hdfs.trident.format.FileNameFormat;
-import org.apache.storm.hdfs.trident.format.RecordFormat;
-import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
-import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file
- * The properties are in JSON format which specifies the name / path of the hdfs file and etc.
- */
-public class HdfsDataSourcesProvider implements DataSourcesProvider {
-
- private static class HdfsTridentDataSource implements ISqlTridentDataSource {
- private final String url;
- private final Properties props;
- private final IOutputSerializer serializer;
-
- private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
- this.url = url;
- this.props = props;
- this.serializer = serializer;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
- }
-
- @Override
- public SqlTridentConsumer getConsumer() {
- FileNameFormat fileNameFormat = new SimpleFileNameFormat()
- .withPath(props.getProperty("hdfs.file.path", "/storm"))
- .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt"));
-
- RecordFormat recordFormat = new TridentRecordFormat(serializer);
-
- FileRotationPolicy rotationPolicy;
- String size = props.getProperty("hdfs.rotation.size.kb");
- String interval = props.getProperty("hdfs.rotation.time.seconds");
- Preconditions.checkArgument(size != null || interval != null, "Hdfs data source must contain file rotation config");
-
- if (size != null) {
- rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB);
- } else {
- rotationPolicy = new TimedRotationPolicy(Float.parseFloat(interval), TimedRotationPolicy.TimeUnit.SECONDS);
- }
-
- HdfsState.Options options = new HdfsState.HdfsFileOptions()
- .withFileNameFormat(fileNameFormat)
- .withRecordFormat(recordFormat)
- .withRotationPolicy(rotationPolicy)
- .withFsUrl(url);
-
- StateFactory stateFactory = new HdfsStateFactory().withOptions(options);
- StateUpdater stateUpdater = new HdfsUpdater();
-
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
- }
-
- private static class TridentRecordFormat implements RecordFormat {
- private final IOutputSerializer serializer;
-
- private TridentRecordFormat(IOutputSerializer serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public byte[] format(TridentTuple tuple) {
- //TODO we should handle '\n'. ref DelimitedRecordFormat
- return serializer.write(tuple.getValues(), null).array();
- }
-
- }
-
- @Override
- public String scheme() {
- return "hdfs";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
- return new HdfsTridentDataSource(uri.toString(), properties, serializer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 5fac84f..0000000
--- a/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.hdfs.HdfsDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
deleted file mode 100644
index 1473438..0000000
--- a/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.hdfs;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.storm.hdfs.trident.HdfsState;
-import org.apache.storm.hdfs.trident.HdfsStateFactory;
-import org.apache.storm.hdfs.trident.HdfsUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class TestHdfsDataSourcesProvider {
- private static final List<FieldInfo> FIELDS = ImmutableList.of(
- new FieldInfo("ID", int.class, true),
- new FieldInfo("val", String.class, false));
- private static final Properties TBL_PROPERTIES = new Properties();
-
- private static String hdfsURI;
- private static MiniDFSCluster hdfsCluster;
-
- static {
- TBL_PROPERTIES.put("hdfs.file.path", "/unittest");
- TBL_PROPERTIES.put("hdfs.file.name", "test1.txt");
- TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120");
- }
-
- @Before
- public void setup() throws Exception {
- Configuration conf = new Configuration();
- conf.set("fs.trash.interval", "10");
- conf.setBoolean("dfs.permissions", true);
- File baseDir = new File("./target/hdfs/").getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
- hdfsCluster = builder.build();
- hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
- }
-
- @After
- public void shutDown() throws IOException {
- hdfsCluster.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testHdfsSink() {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
-
- ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
- Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass());
- Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass());
-
- HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- StateUpdater stateUpdater = consumer.getStateUpdater();
-
- HdfsFileOptions options = mock(HdfsFileOptions.class);
- Whitebox.setInternalState(state, "options", options);
-
- List<TridentTuple> tupleList = mockTupleList();
-
- for (TridentTuple t : tupleList) {
- stateUpdater.updateState(state, Collections.singletonList(t), null);
- try {
- verify(options).execute(Collections.singletonList(t));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
deleted file mode 100644
index a7bdf7a..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
+++ /dev/null
@@ -1,93 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-kafka</artifactId>
-
- <developers>
- <developer>
- <id>haohui</id>
- <name>Haohui Mai</name>
- <email>ricetons@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>${storm.kafka.artifact.id}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${storm.kafka.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- </build>
-</project>