You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:20 UTC
[17/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
deleted file mode 100644
index 776a54c..0000000
--- a/external/sql/storm-sql-core/pom.xml
+++ /dev/null
@@ -1,279 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-core</artifactId>
-
- <developers>
- <developer>
- <id>haohui</id>
- <name>Haohui Mai</name>
- <email>ricetons@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- <version>${calcite.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>4.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <plugins>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-fmpp-resources</id>
- <phase>initialize</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/codegen</outputDirectory>
- <resources>
- <resource>
- <directory>src/codegen</directory>
- <filtering>false</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- <execution>
- <id>copy-java-sources</id>
- <phase>process-sources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes/</outputDirectory>
- <resources>
- <resource>
- <directory>src/jvm</directory>
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>src/test</directory>
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>target/generated-sources</directory>
- <!-- <include>*/org</include> -->
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <!-- Extract parser grammar template from calcite-core.jar and put
- it under ${project.build.directory} where all freemarker templates are. -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>unpack-parser-template</id>
- <phase>initialize</phase>
- <goals>
- <goal>unpack</goal>
- </goals>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- <type>jar</type>
- <overWrite>true</overWrite>
- <outputDirectory>${project.build.directory}/</outputDirectory>
- <includes>**/Parser.jj</includes>
- </artifactItem>
- </artifactItems>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- using appassembler-maven-plugin instead of maven-dependency-plugin to copy dependencies
- as copy and unpack goal are not working together -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>appassembler-maven-plugin</artifactId>
- <version>1.9</version>
- <executions>
- <execution>
- <id>create-repo</id>
- <goals>
- <goal>create-repository</goal>
- </goals>
- <configuration>
- <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
- <repositoryLayout>flat</repositoryLayout>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.googlecode.fmpp-maven-plugin</groupId>
- <artifactId>fmpp-maven-plugin</artifactId>
- <version>1.0</version>
- <dependencies>
- <dependency>
- <groupId>org.freemarker</groupId>
- <artifactId>freemarker</artifactId>
- <version>2.3.25-incubating</version>
- </dependency>
- </dependencies>
- <executions>
- <execution>
- <id>generate-fmpp-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>generate</goal>
- </goals>
- <configuration>
- <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
- <outputDirectory>target/generated-sources</outputDirectory>
- <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>add-generated-sources</id>
- <phase>process-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.build.directory}/generated-sources</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>javacc-maven-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <id>javacc</id>
- <goals>
- <goal>javacc</goal>
- </goals>
- <configuration>
- <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
- <includes>
- <include>**/Parser.jj</include>
- </includes>
- <lookAhead>2</lookAhead>
- <isStatic>false</isStatic>
- <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/config.fmpp b/external/sql/storm-sql-core/src/codegen/config.fmpp
deleted file mode 100644
index be5a792..0000000
--- a/external/sql/storm-sql-core/src/codegen/config.fmpp
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http:# www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-data: {
- parser: tdd(../data/Parser.tdd)
-}
-
-freemarkerLinks: {
- includes: includes/
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
deleted file mode 100644
index b0dccb6..0000000
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ /dev/null
@@ -1,80 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http:# www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- # Generated parser implementation class package and name
- package: "org.apache.storm.sql.parser.impl",
- class: "StormParserImpl",
-
- # List of import statements.
- imports: [
- "org.apache.calcite.sql.validate.*",
- "org.apache.calcite.util.*",
- "org.apache.storm.sql.parser.*",
- "java.util.*"
- ]
-
- # List of keywords.
- keywords: [
- "LOCATION",
- "INPUTFORMAT",
- "OUTPUTFORMAT",
- "PARALLELISM",
- "STORED",
- "TBLPROPERTIES",
- "JAR"
- ]
-
- # List of methods for parsing custom SQL statements.
- statementParserMethods: [
- "SqlCreateTable()",
- "SqlCreateFunction()"
- ]
-
- # List of methods for parsing custom literals.
- # Example: ParseJsonLiteral().
- literalParserMethods: [
- ]
-
- # List of methods for parsing custom data types.
- dataTypeParserMethods: [
- ]
-
- nonReservedKeywords: [
- ]
-
- createStatementParserMethods: [
- ]
-
- alterStatementParserMethods: [
- ]
-
- dropStatementParserMethods: [
- ]
-
- # List of files in @includes directory that have parser method
- # implementations for custom SQL statements, literals or types
- # given as part of "statementParserMethods", "literalParserMethods" or
- # "dataTypeParserMethods".
- implementationFiles: [
- "parserImpls.ftl"
- ]
-
- includeCompoundIdentifier: true,
- includeBraces: true,
- includeAdditionalDeclarations: false,
- allowBangEqual: false
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/includes/license.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/license.ftl b/external/sql/storm-sql-core/src/codegen/includes/license.ftl
deleted file mode 100644
index 7e66353..0000000
--- a/external/sql/storm-sql-core/src/codegen/includes/license.ftl
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
deleted file mode 100644
index 4143840..0000000
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ /dev/null
@@ -1,113 +0,0 @@
-<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-
-
-private void ColumnDef(List<ColumnDefinition> list) :
-{
- SqlParserPos pos;
- SqlIdentifier name;
- SqlDataTypeSpec type;
- ColumnConstraint constraint = null;
- SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
-}
-{
- name = SimpleIdentifier() { pos = getPos(); }
- type = DataType()
- [
- <PRIMARY> <KEY>
- [ <ASC> { monotonicity = SqlMonotonicity.INCREASING; }
- | <DESC> { monotonicity = SqlMonotonicity.DECREASING; }
- ]
- { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
- ]
- {
- list.add(new ColumnDefinition(name, type, constraint, pos));
- }
-}
-
-SqlNodeList ColumnDefinitionList() :
-{
- SqlParserPos pos;
- List<ColumnDefinition> list = Lists.newArrayList();
-}
-{
- <LPAREN> { pos = getPos(); }
- ColumnDef(list)
- ( <COMMA> ColumnDef(list) )*
- <RPAREN> {
- return new SqlNodeList(list, pos.plus(getPos()));
- }
-}
-
-/**
- * CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
- * ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
- * ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )?
- * LOCATION location_uri
- * ( TBLPROPERTIES tbl_properties )?
- * ( AS select_stmt )
- */
-SqlNode SqlCreateTable() :
-{
- SqlParserPos pos;
- SqlIdentifier tblName;
- SqlNodeList fieldList;
- SqlNode location;
- SqlNode parallelism = null;
- SqlNode input_format_class_name = null, output_format_class_name = null;
- SqlNode tbl_properties = null;
- SqlNode select = null;
-}
-{
- <CREATE> { pos = getPos(); }
- <EXTERNAL> <TABLE>
- tblName = CompoundIdentifier()
- fieldList = ColumnDefinitionList()
- [
- <STORED> <AS>
- <INPUTFORMAT> input_format_class_name = StringLiteral()
- <OUTPUTFORMAT> output_format_class_name = StringLiteral()
- ]
- <LOCATION>
- location = StringLiteral()
- [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
- [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
- [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
- return new SqlCreateTable(pos, tblName, fieldList,
- input_format_class_name, output_format_class_name, location,
- parallelism, tbl_properties, select);
- }
-}
-
-/**
- * CREATE FUNCTION functionname AS 'classname'
- */
-SqlNode SqlCreateFunction() :
-{
- SqlParserPos pos;
- SqlIdentifier functionName;
- SqlNode className;
- SqlNode jarName = null;
-}
-{
- <CREATE> { pos = getPos(); }
- <FUNCTION>
- functionName = CompoundIdentifier()
- <AS>
- className = StringLiteral()
- [
- <USING> <JAR>
- jarName = StringLiteral()
- ]
- {
- return new SqlCreateFunction(pos, functionName, className, jarName);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
deleted file mode 100644
index 6af71d4..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.calcite.DataContext;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractTridentProcessor {
- protected Stream outputStream;
- protected DataContext dataContext;
- protected List<CompilingClassLoader> classLoaders;
- /**
- * @return the output stream of the SQL
- */
- public Stream outputStream() {
- return outputStream;
- }
-
- /**
- * Construct the trident topology based on the SQL.
- */
- public abstract TridentTopology build();
-
- /**
- * @return DataContext instance which is used with execution of query
- */
- public DataContext getDataContext() {
- return dataContext;
- }
-
- /**
- * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
- */
- public List<CompilingClassLoader> getClassLoaders() {
- return classLoaders;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
deleted file mode 100644
index 5dec4af..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.runtime.ChannelHandler;
-
-import java.util.Map;
-
-/**
- * The StormSql class provides standalone, interactive interfaces to execute
- * SQL statements over streaming data.
- * <p>
- * The StormSql class is stateless. The user needs to submit the data
- * definition language (DDL) statements and the query statements in the same
- * batch.
- */
-public abstract class StormSql {
- /**
- * Execute the SQL statements in stand-alone mode. The user can retrieve the result by passing in an instance
- * of {@see ChannelHandler}.
- */
- public abstract void execute(Iterable<String> statements,
- ChannelHandler handler) throws Exception;
-
- /**
- * Submit the SQL statements to Nimbus and run it as a topology.
- */
- public abstract void submit(
- String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
- StormSubmitter.ProgressListener progressListener, String asUser)
- throws Exception;
-
- /**
- * Print out query plan for each query.
- */
- public abstract void explain(Iterable<String> statements) throws Exception;
-
- public static StormSql construct() {
- return new StormSqlImpl();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
deleted file mode 100644
index 007daa7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
-import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.parser.ColumnConstraint;
-import org.apache.storm.sql.parser.ColumnDefinition;
-import org.apache.storm.sql.parser.SqlCreateFunction;
-import org.apache.storm.sql.parser.SqlCreateTable;
-import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.TridentTopology;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.Attributes;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-
-import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
-
-class StormSqlImpl extends StormSql {
- private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
- private final SchemaPlus schema = Frameworks.createRootSchema(true);
- private boolean hasUdf = false;
-
- @Override
- public void execute(
- Iterable<String> statements, ChannelHandler result)
- throws Exception {
- Map<String, DataSource> dataSources = new HashMap<>();
- for (String sql : statements) {
- StormParser parser = new StormParser(sql);
- SqlNode node = parser.impl().parseSqlStmtEof();
- if (node instanceof SqlCreateTable) {
- handleCreateTable((SqlCreateTable) node, dataSources);
- } else if (node instanceof SqlCreateFunction) {
- handleCreateFunction((SqlCreateFunction) node);
- } else {
- FrameworkConfig config = buildFrameWorkConfig();
- Planner planner = Frameworks.getPlanner(config);
- SqlNode parse = planner.parse(sql);
- SqlNode validate = planner.validate(parse);
- RelNode tree = planner.convert(validate);
- PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(tree);
- proc.initialize(dataSources, result);
- }
- }
- }
-
- @Override
- public void submit(
- String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
- StormSubmitter.ProgressListener progressListener, String asUser)
- throws Exception {
- Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
- for (String sql : statements) {
- StormParser parser = new StormParser(sql);
- SqlNode node = parser.impl().parseSqlStmtEof();
- if (node instanceof SqlCreateTable) {
- handleCreateTableForTrident((SqlCreateTable) node, dataSources);
- } else if (node instanceof SqlCreateFunction) {
- handleCreateFunction((SqlCreateFunction) node);
- } else {
- QueryPlanner planner = new QueryPlanner(schema);
- AbstractTridentProcessor processor = planner.compile(dataSources, sql);
- TridentTopology topo = processor.build();
-
- Path jarPath = null;
- try {
- // QueryPlanner on Trident mode configures the topology with compiled classes,
- // so we need to add new classes into topology jar
- // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
-
- jarPath = Files.createTempFile("storm-sql", ".jar");
- System.setProperty("storm.jar", jarPath.toString());
- packageTopology(jarPath, processor);
- StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
- } finally {
- if (jarPath != null) {
- Files.delete(jarPath);
- }
- }
- }
- }
- }
-
- @Override
- public void explain(Iterable<String> statements) throws Exception {
- Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
- for (String sql : statements) {
- StormParser parser = new StormParser(sql);
- SqlNode node = parser.impl().parseSqlStmtEof();
-
- System.out.println("===========================================================");
- System.out.println("query>");
- System.out.println(sql);
- System.out.println("-----------------------------------------------------------");
-
- if (node instanceof SqlCreateTable) {
- handleCreateTableForTrident((SqlCreateTable) node, dataSources);
- System.out.println("No plan presented on DDL");
- } else if (node instanceof SqlCreateFunction) {
- handleCreateFunction((SqlCreateFunction) node);
- System.out.println("No plan presented on DDL");
- } else {
- FrameworkConfig config = buildFrameWorkConfig();
- Planner planner = Frameworks.getPlanner(config);
- SqlNode parse = planner.parse(sql);
- SqlNode validate = planner.validate(parse);
- RelNode tree = planner.convert(validate);
-
- String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
- System.out.println("plan>");
- System.out.println(plan);
- }
-
- System.out.println("===========================================================");
- }
- }
-
- private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
- Manifest manifest = new Manifest();
- Attributes attr = manifest.getMainAttributes();
- attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
- attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
- try (JarOutputStream out = new JarOutputStream(
- new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
- List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
- if (classLoaders != null && !classLoaders.isEmpty()) {
- for (CompilingClassLoader classLoader : classLoaders) {
- for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
- out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
- out.write(e.getValue().toByteArray());
- out.closeEntry();
- }
- }
- }
- }
- }
-
- private void handleCreateTable(
- SqlCreateTable n, Map<String, DataSource> dataSources) {
- List<FieldInfo> fields = updateSchema(n);
- DataSource ds = DataSourcesRegistry.construct(n.location(), n
- .inputFormatClass(), n.outputFormatClass(), fields);
- if (ds == null) {
- throw new RuntimeException("Cannot construct data source for " + n
- .tableName());
- } else if (dataSources.containsKey(n.tableName())) {
- throw new RuntimeException("Duplicated definition for table " + n
- .tableName());
- }
- dataSources.put(n.tableName(), ds);
- }
-
- private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException {
- if(sqlCreateFunction.jarName() != null) {
- throw new UnsupportedOperationException("UDF 'USING JAR' not implemented");
- }
- Method method;
- Function function;
- if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) {
- function = ScalarFunctionImpl.create(method);
- } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
- function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
- } else {
- throw new RuntimeException("Invalid scalar or aggregate function");
- }
- schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
- hasUdf = true;
- }
-
- private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException {
- Class<?> clazz = Class.forName(clazzName);
- for (Method method : clazz.getMethods()) {
- if (method.getName().equals(methodName)) {
- return method;
- }
- }
- return null;
- }
-
- private void handleCreateTableForTrident(
- SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
- List<FieldInfo> fields = updateSchema(n);
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
- .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
- if (ds == null) {
- throw new RuntimeException("Failed to find data source for " + n
- .tableName() + " URI: " + n.location());
- } else if (dataSources.containsKey(n.tableName())) {
- throw new RuntimeException("Duplicated definition for table " + n
- .tableName());
- }
- dataSources.put(n.tableName(), ds);
- }
-
- private List<FieldInfo> updateSchema(SqlCreateTable n) {
- TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
- List<FieldInfo> fields = new ArrayList<>();
- for (ColumnDefinition col : n.fieldList()) {
- builder.field(col.name(), col.type(), col.constraint());
- RelDataType dataType = col.type().deriveType(typeFactory);
- Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
- ColumnConstraint constraint = col.constraint();
- boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
- fields.add(new FieldInfo(col.name(), javaType, isPrimary));
- }
-
- if (n.parallelism() != null) {
- builder.parallelismHint(n.parallelism());
- }
- Table table = builder.build();
- schema.add(n.tableName(), table);
- return fields;
- }
-
- private FrameworkConfig buildFrameWorkConfig() {
- if (hasUdf) {
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
- false,
- Collections.<String>emptyList(), typeFactory));
- return Frameworks.newConfigBuilder().defaultSchema(schema)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
- } else {
- return Frameworks.newConfigBuilder().defaultSchema(schema).build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
deleted file mode 100644
index 5618647..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.utils.Utils;
-
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-
-public class StormSqlRunner {
- private static final String OPTION_SQL_FILE_SHORT = "f";
- private static final String OPTION_SQL_FILE_LONG = "file";
- private static final String OPTION_SQL_TOPOLOGY_NAME_SHORT = "t";
- private static final String OPTION_SQL_TOPOLOGY_NAME_LONG = "topology";
- private static final String OPTION_SQL_EXPLAIN_SHORT = "e";
- private static final String OPTION_SQL_EXPLAIN_LONG = "explain";
-
- public static void main(String[] args) throws Exception {
- Options options = buildOptions();
- CommandLineParser parser = new DefaultParser();
- CommandLine commandLine = parser.parse(options, args);
-
- if (!commandLine.hasOption(OPTION_SQL_FILE_LONG)) {
- printUsageAndExit(options, OPTION_SQL_FILE_LONG + " is required");
- }
-
- String filePath = commandLine.getOptionValue(OPTION_SQL_FILE_LONG);
- List<String> stmts = Files.readAllLines(Paths.get(filePath), StandardCharsets.UTF_8);
- StormSql sql = StormSql.construct();
- @SuppressWarnings("unchecked")
- Map<String, ?> conf = Utils.readStormConfig();
-
- if (commandLine.hasOption(OPTION_SQL_EXPLAIN_LONG)) {
- sql.explain(stmts);
- } else if (commandLine.hasOption(OPTION_SQL_TOPOLOGY_NAME_LONG)) {
- String topoName = commandLine.getOptionValue(OPTION_SQL_TOPOLOGY_NAME_LONG);
- SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
- sql.submit(topoName, stmts, conf, submitOptions, null, null);
- } else {
- printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG +
- " must be presented");
- }
- }
-
- private static void printUsageAndExit(Options options, String message) {
- System.out.println(message);
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("storm-sql-runner ", options);
- System.exit(1);
- }
-
- private static Options buildOptions() {
- Options options = new Options();
- options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
- options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
- options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
- return options;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
deleted file mode 100644
index c6b584d..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.calcite;
-
-import org.apache.calcite.rel.stream.Delta;
-import org.apache.calcite.schema.StreamableTable;
-
-/**
- * Table that can be converted to a stream. This table also has its parallelism information.
- *
- * @see Delta
- */
-public interface ParallelStreamableTable extends StreamableTable {
-
- /**
- * Returns parallelism hint of this table. Returns null if don't know.
- */
- Integer parallelismHint();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
deleted file mode 100644
index 2e237c0..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.SqlDataTypeSpec;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.parser.ColumnConstraint;
-
-import java.util.ArrayList;
-
-import static org.apache.calcite.rel.RelFieldCollation.Direction;
-import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
-import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
-import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
-import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
-
-public class CompilerUtil {
- public static String escapeJavaString(String s, boolean nullMeansNull) {
- if(s == null) {
- return nullMeansNull ? "null" : "\"\"";
- } else {
- String s1 = Util.replace(s, "\\", "\\\\");
- String s2 = Util.replace(s1, "\"", "\\\"");
- String s3 = Util.replace(s2, "\n\r", "\\n");
- String s4 = Util.replace(s3, "\n", "\\n");
- String s5 = Util.replace(s4, "\r", "\\r");
- return "\"" + s5 + "\"";
- }
- }
-
- public static class TableBuilderInfo {
- private final RelDataTypeFactory typeFactory;
-
- public TableBuilderInfo(RelDataTypeFactory typeFactory) {
- this.typeFactory = typeFactory;
- }
-
- private static class FieldType {
- private final String name;
- private final RelDataType relDataType;
-
- private FieldType(String name, RelDataType relDataType) {
- this.name = name;
- this.relDataType = relDataType;
- }
-
- }
-
- private final ArrayList<FieldType> fields = new ArrayList<>();
- private final ArrayList<Object[]> rows = new ArrayList<>();
- private int primaryKey = -1;
- private Integer parallelismHint;
- private SqlMonotonicity primaryKeyMonotonicity;
- private Statistic stats;
-
- public TableBuilderInfo field(String name, SqlTypeName type) {
- return field(name, typeFactory.createSqlType(type));
- }
-
- public TableBuilderInfo field(String name, RelDataType type) {
- fields.add(new FieldType(name, type));
- return this;
- }
-
- public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
- RelDataType dataType = type.deriveType(typeFactory);
- if (constraint instanceof ColumnConstraint.PrimaryKey) {
- ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
- Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
- primaryKey = fields.size();
- primaryKeyMonotonicity = pk.monotonicity();
- }
- fields.add(new FieldType(name, dataType));
- return this;
- }
-
- public TableBuilderInfo statistics(Statistic stats) {
- this.stats = stats;
- return this;
- }
-
- @VisibleForTesting
- public TableBuilderInfo rows(Object[] data) {
- rows.add(data);
- return this;
- }
-
- public TableBuilderInfo parallelismHint(int parallelismHint) {
- this.parallelismHint = parallelismHint;
- return this;
- }
-
- public StreamableTable build() {
- final Statistic stat = buildStatistic();
- final Table tbl = new Table() {
- @Override
- public RelDataType getRowType(
- RelDataTypeFactory relDataTypeFactory) {
- RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
- for (FieldType f : fields) {
- b.add(f.name, f.relDataType);
- }
- return b.build();
- }
-
- @Override
- public Statistic getStatistic() {
- return stat != null ? stat : Statistics.of(rows.size(),
- ImmutableList.<ImmutableBitSet>of());
- }
-
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.STREAM;
- }
- };
-
- return new ParallelStreamableTable() {
- @Override
- public Integer parallelismHint() {
- return parallelismHint;
- }
-
- @Override
- public Table stream() {
- return tbl;
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
- return tbl.getRowType(relDataTypeFactory);
- }
-
- @Override
- public Statistic getStatistic() {
- return tbl.getStatistic();
- }
-
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.STREAM;
- }
- };
- }
-
- private Statistic buildStatistic() {
- if (stats != null || primaryKey == -1) {
- return stats;
- }
- Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
- RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
- return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
- ImmutableList.of(RelCollations.of(collation)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
deleted file mode 100644
index 5ac95e0..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.calcite.adapter.enumerable.JavaRowFormat;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.BlockBuilder;
-import org.apache.calcite.linq4j.tree.BlockStatement;
-import org.apache.calcite.linq4j.tree.ClassDeclaration;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.MemberDeclaration;
-import org.apache.calcite.linq4j.tree.ParameterExpression;
-import org.apache.calcite.linq4j.tree.Types;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.util.BuiltInMethod;
-import org.apache.calcite.util.Pair;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
- *
- * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
- * we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
- */
-public class RexNodeToJavaCodeCompiler {
- private final RexBuilder rexBuilder;
-
- public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
- this.rexBuilder = rexBuilder;
- }
-
- public BlockStatement compileToBlock(List<RexNode> nodes, RelDataType inputRowType) {
- final RexProgramBuilder programBuilder =
- new RexProgramBuilder(inputRowType, rexBuilder);
- for (RexNode node : nodes) {
- programBuilder.addProject(node, null);
- }
-
- return compileToBlock(programBuilder.getProgram());
- }
-
- public BlockStatement compileToBlock(final RexProgram program) {
- final ParameterExpression context_ =
- Expressions.parameter(Context.class, "context");
- final ParameterExpression outputValues_ =
- Expressions.parameter(Object[].class, "outputValues");
-
- return compileToBlock(program, context_, outputValues_).toBlock();
- }
-
- public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
- final RexProgramBuilder programBuilder =
- new RexProgramBuilder(inputRowType, rexBuilder);
- for (RexNode node : nodes) {
- programBuilder.addProject(node, null);
- }
-
- return compile(programBuilder.getProgram(), className);
- }
-
- public String compile(final RexProgram program, String className) {
- final ParameterExpression context_ =
- Expressions.parameter(Context.class, "context");
- final ParameterExpression outputValues_ =
- Expressions.parameter(Object[].class, "outputValues");
-
- BlockBuilder builder = compileToBlock(program, context_, outputValues_);
- return baz(context_, outputValues_, builder.toBlock(), className);
- }
-
- private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
- ParameterExpression outputValues_) {
- RelDataType inputRowType = program.getInputRowType();
- final BlockBuilder builder = new BlockBuilder();
- final JavaTypeFactoryImpl javaTypeFactory =
- new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
-
- final RexToLixTranslator.InputGetter inputGetter =
- new RexToLixTranslator.InputGetterImpl(
- ImmutableList.of(
- Pair.<Expression, PhysType>of(
- Expressions.field(context_,
- BuiltInMethod.CONTEXT_VALUES.field),
- PhysTypeImpl.of(javaTypeFactory, inputRowType,
- JavaRowFormat.ARRAY, false))));
- final Function1<String, RexToLixTranslator.InputGetter> correlates =
- new Function1<String, RexToLixTranslator.InputGetter>() {
- public RexToLixTranslator.InputGetter apply(String a0) {
- throw new UnsupportedOperationException();
- }
- };
- final Expression root =
- Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
- final List<Expression> list =
- RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
- null, root, inputGetter, correlates);
- for (int i = 0; i < list.size(); i++) {
- builder.add(
- Expressions.statement(
- Expressions.assign(
- Expressions.arrayIndex(outputValues_,
- Expressions.constant(i)),
- list.get(i))));
- }
-
- return builder;
- }
-
- /** Given a method that implements {@link ExecutableExpression#execute(Context, Object[])},
- * adds a bridge method that implements {@link ExecutableExpression#execute(Context)}, and
- * compiles. */
- static String baz(ParameterExpression context_,
- ParameterExpression outputValues_, BlockStatement block, String className) {
- final List<MemberDeclaration> declarations = Lists.newArrayList();
-
- // public void execute(Context, Object[] outputValues)
- declarations.add(
- Expressions.methodDecl(Modifier.PUBLIC, void.class,
- StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
- ImmutableList.of(context_, outputValues_), block));
-
- // public Object execute(Context)
- final BlockBuilder builder = new BlockBuilder();
- final Expression values_ = builder.append("values",
- Expressions.newArrayBounds(Object.class, 1,
- Expressions.constant(1)));
- builder.add(
- Expressions.statement(
- Expressions.call(
- Expressions.parameter(ExecutableExpression.class, "this"),
- StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
- builder.add(
- Expressions.return_(null,
- Expressions.arrayIndex(values_, Expressions.constant(0))));
- declarations.add(
- Expressions.methodDecl(Modifier.PUBLIC, Object.class,
- StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
- ImmutableList.of(context_), builder.toBlock()));
-
- final ClassDeclaration classDeclaration =
- Expressions.classDecl(Modifier.PUBLIC, className, null,
- ImmutableList.<Type>of(ExecutableExpression.class), declarations);
-
- return Expressions.toString(Lists.newArrayList(classDeclaration), "\n", false);
- }
-
- enum StormBuiltInMethod {
- EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
- EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
-
- public final Method method;
- public final Constructor constructor;
- public final Field field;
-
- public static final ImmutableMap<Method, BuiltInMethod> MAP;
-
- static {
- final ImmutableMap.Builder<Method, BuiltInMethod> builder =
- ImmutableMap.builder();
- for (BuiltInMethod value : BuiltInMethod.values()) {
- if (value.method != null) {
- builder.put(value.method, value);
- }
- }
- MAP = builder.build();
- }
-
- private StormBuiltInMethod(Method method, Constructor constructor, Field field) {
- this.method = method;
- this.constructor = constructor;
- this.field = field;
- }
-
- /**
- * Defines a method.
- */
- StormBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
- this(Types.lookupMethod(clazz, methodName, argumentTypes), null, null);
- }
-
- /**
- * Defines a constructor.
- */
- StormBuiltInMethod(Class clazz, Class... argumentTypes) {
- this(null, Types.lookupConstructor(clazz, argumentTypes), null);
- }
-
- /**
- * Defines a field.
- */
- StormBuiltInMethod(Class clazz, String fieldName, boolean dummy) {
- this(null, null, Types.lookupField(clazz, fieldName));
- assert dummy : "dummy value for method overloading must be true";
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
deleted file mode 100644
index 21ca063..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl {
-
- public StormSqlTypeFactoryImpl() {
- }
-
- public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
- super(typeSystem);
- }
-
- @Override
- public RelDataType toSql(RelDataType type) {
- if (type instanceof JavaType) {
- JavaType javaType = (JavaType) type;
- SqlTypeName sqlTypeName = JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass());
- if (sqlTypeName == null) {
- sqlTypeName = SqlTypeName.ANY;
- }
- return createTypeWithNullability(createSqlType(sqlTypeName), type.isNullable());
- }
- return super.toSql(type);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
deleted file mode 100644
index 9dc4ba8..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.storm.tuple.Values;
-
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Built-in implementations for some of the standard aggregation operations.
- * Aggregations can be implemented as a class with the following methods viz. init, add and result.
- * The class could contain only static methods, only non-static methods or be generic.
- */
-public class BuiltinAggregateFunctions {
- // binds the type information and the class implementing the aggregation
- public static class TypeClass {
- public static class GenericType {
- }
-
- public final Type ty;
- public final Class<?> clazz;
-
- private TypeClass(Type ty, Class<?> clazz) {
- this.ty = ty;
- this.clazz = clazz;
- }
-
- static TypeClass of(Type ty, Class<?> clazz) {
- return new TypeClass(ty, clazz);
- }
- }
-
- static final Map<String, List<TypeClass>> TABLE = new HashMap<>();
-
- public static class ByteSum {
- public static Byte init() {
- return 0;
- }
-
- public static Byte add(Byte accumulator, Byte val) {
- return (byte) (accumulator + val);
- }
-
- public static Byte result(Byte accumulator) {
- return accumulator;
- }
- }
-
- public static class ShortSum {
- public static Short init() {
- return 0;
- }
-
- public static Short add(Short accumulator, Short val) {
- return (short) (accumulator + val);
- }
-
- public static Short result(Short accumulator) {
- return accumulator;
- }
- }
-
- public static class IntSum {
- public static Integer init() {
- return 0;
- }
-
- public static Integer add(Integer accumulator, Integer val) {
- return accumulator + val;
- }
-
- public static Integer result(Integer accumulator) {
- return accumulator;
- }
- }
-
- public static class LongSum {
- public static Long init() {
- return 0L;
- }
-
- public static Long add(Long accumulator, Long val) {
- return accumulator + val;
- }
-
- public static Long result(Long accumulator) {
- return accumulator;
- }
- }
-
- public static class FloatSum {
- public static Float init() {
- return 0.0f;
- }
-
- public static Float add(Float accumulator, Float val) {
- return accumulator + val;
- }
-
- public static Float result(Float accumulator) {
- return accumulator;
- }
- }
-
- public static class DoubleSum {
- public static Double init() {
- return 0.0;
- }
-
- public static Double add(Double accumulator, Double val) {
- return accumulator + val;
- }
-
- public static Double result(Double accumulator) {
- return accumulator;
- }
- }
-
- public static class Max<T extends Comparable<T>> {
- public T init() {
- return null;
- }
-
- public T add(T accumulator, T val) {
- return (accumulator == null || accumulator.compareTo(val) < 0) ? val : accumulator;
- }
-
- public T result(T accumulator) {
- return accumulator;
- }
- }
-
- public static class Min<T extends Comparable<T>> {
- public T init() {
- return null;
- }
-
- public T add(T accumulator, T val) {
- return (accumulator == null || accumulator.compareTo(val) > 0) ? val : accumulator;
- }
-
- public T result(T accumulator) {
- return accumulator;
- }
- }
-
- public static class IntAvg {
- private int count;
-
- public Integer init() {
- return 0;
- }
-
- public Integer add(Integer accumulator, Integer val) {
- ++count;
- return accumulator + val;
- }
-
- public Integer result(Integer accumulator) {
- Integer result = accumulator / count;
- count = 0;
- return result;
- }
- }
-
- public static class DoubleAvg {
- private int count;
-
- public Double init() {
- return 0.0;
- }
-
- public Double add(Double accumulator, Double val) {
- ++count;
- return accumulator + val;
- }
-
- public Double result(Double accumulator) {
- Double result = accumulator / count;
- count = 0;
- return result;
- }
- }
-
- public static class Count {
- public static Long init() {
- return 0L;
- }
-
- public static Long add(Long accumulator, Values vals) {
- for (Object val : vals) {
- if (val == null) {
- return accumulator;
- }
- }
- return accumulator + 1;
- }
-
- public static Long result(Long accumulator) {
- return accumulator;
- }
- }
-
- static {
- TABLE.put("SUM", ImmutableList.of(
- TypeClass.of(float.class, FloatSum.class),
- TypeClass.of(double.class, DoubleSum.class),
- TypeClass.of(byte.class, ByteSum.class),
- TypeClass.of(short.class, ShortSum.class),
- TypeClass.of(long.class, LongSum.class),
- TypeClass.of(int.class, IntSum.class)));
- TABLE.put("AVG", ImmutableList.of(
- TypeClass.of(double.class, DoubleAvg.class),
- TypeClass.of(int.class, IntAvg.class)));
- TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, Count.class)));
- TABLE.put("MAX", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class)));
- TABLE.put("MIN", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class)));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
deleted file mode 100644
index 01546ed..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.Set;
-
-public class PlanCompiler {
- private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class);
-
- private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
- private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
- private static final String PROLOGUE = NEW_LINE_JOINER.join(
- "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
- "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;",
- "import java.util.List;", "import java.util.ArrayList;",
- "import java.util.LinkedHashMap;",
- "import org.apache.storm.tuple.Values;",
- "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
- "import org.apache.storm.sql.runtime.Channels;",
- "import org.apache.storm.sql.runtime.ChannelContext;",
- "import org.apache.storm.sql.runtime.ChannelHandler;",
- "import org.apache.storm.sql.runtime.DataSource;",
- "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
- "import com.google.common.collect.ArrayListMultimap;",
- "import com.google.common.collect.Multimap;",
- "import org.apache.calcite.interpreter.Context;",
- "import org.apache.calcite.interpreter.StormContext;",
- "import org.apache.calcite.DataContext;",
- "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
- "public final class Processor extends AbstractValuesProcessor {",
- " public final static DataContext dataContext = new StormDataContext();",
- "");
- private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
- " @Override",
- " public void initialize(Map<String, DataSource> data,",
- " ChannelHandler result) {",
- " ChannelContext r = Channels.chain(Channels.voidContext(), result);",
- ""
- );
-
- private final JavaTypeFactory typeFactory;
-
- public PlanCompiler(JavaTypeFactory typeFactory) {
- this.typeFactory = typeFactory;
- }
-
- private String generateJavaSource(RelNode root) throws Exception {
- StringWriter sw = new StringWriter();
- try (PrintWriter pw = new PrintWriter(sw)) {
- RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
- printPrologue(pw);
- compiler.traverse(root);
- printMain(pw, root);
- printEpilogue(pw);
- }
- return sw.toString();
- }
-
- private void printMain(PrintWriter pw, RelNode root) {
- Set<TableScan> tables = new HashSet<>();
- pw.print(INITIALIZER_PROLOGUE);
- chainOperators(pw, root, tables);
- for (TableScan n : tables) {
- String escaped = CompilerUtil.escapeJavaString(
- Joiner.on('.').join(n.getTable().getQualifiedName()), true);
- String r = NEW_LINE_JOINER.join(
- " if (!data.containsKey(%1$s))",
- " throw new RuntimeException(\"Cannot find table \" + %1$s);",
- " data.get(%1$s).open(CTX_%2$d);",
- "");
- pw.print(String.format(r, escaped, n.getId()));
- }
- pw.print(" }\n");
- }
-
- private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> tables) {
- doChainOperators(pw, root, tables, "r");
- }
-
- private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> tables, String parentCtx) {
- pw.print(
- String.format(" ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
- node.getId(), parentCtx, RelNodeCompiler.getStageName(node)));
- String currentCtx = String.format("CTX_%d", node.getId());
- if (node instanceof TableScan) {
- tables.add((TableScan) node);
- }
- for (RelNode i : node.getInputs()) {
- doChainOperators(pw, i, tables, currentCtx);
- }
- }
-
- public AbstractValuesProcessor compile(RelNode plan) throws Exception {
- String javaCode = generateJavaSource(plan);
- LOG.debug("Compiling... source code {}", javaCode);
- ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
- PACKAGE_NAME + ".Processor",
- javaCode, null);
- return (AbstractValuesProcessor) cl.loadClass(
- PACKAGE_NAME + ".Processor").newInstance();
- }
-
- private static void printEpilogue(
- PrintWriter pw) throws Exception {
- pw.print("}\n");
- }
-
- private static void printPrologue(PrintWriter pw) {
- pw.append(PROLOGUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
deleted file mode 100644
index afed8a9..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.*;
-import org.apache.calcite.rel.stream.Delta;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class PostOrderRelNodeVisitor<T> {
- public final T traverse(RelNode n) throws Exception {
- List<T> inputStreams = new ArrayList<>();
- for (RelNode input : n.getInputs()) {
- inputStreams.add(traverse(input));
- }
-
- if (n instanceof Aggregate) {
- return visitAggregate((Aggregate) n, inputStreams);
- } else if (n instanceof Calc) {
- return visitCalc((Calc) n, inputStreams);
- } else if (n instanceof Collect) {
- return visitCollect((Collect) n, inputStreams);
- } else if (n instanceof Correlate) {
- return visitCorrelate((Correlate) n, inputStreams);
- } else if (n instanceof Delta) {
- return visitDelta((Delta) n, inputStreams);
- } else if (n instanceof Exchange) {
- return visitExchange((Exchange) n, inputStreams);
- } else if (n instanceof Project) {
- return visitProject((Project) n, inputStreams);
- } else if (n instanceof Filter) {
- return visitFilter((Filter) n, inputStreams);
- } else if (n instanceof Sample) {
- return visitSample((Sample) n, inputStreams);
- } else if (n instanceof Sort) {
- return visitSort((Sort) n, inputStreams);
- } else if (n instanceof TableModify) {
- return visitTableModify((TableModify) n, inputStreams);
- } else if (n instanceof TableScan) {
- return visitTableScan((TableScan) n, inputStreams);
- } else if (n instanceof Uncollect) {
- return visitUncollect((Uncollect) n, inputStreams);
- } else if (n instanceof Window) {
- return visitWindow((Window) n, inputStreams);
- } else if (n instanceof Join) {
- return visitJoin((Join) n, inputStreams);
- } else {
- return defaultValue(n, inputStreams);
- }
- }
-
- public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
- return defaultValue(aggregate, inputStreams);
- }
-
- public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
- return defaultValue(calc, inputStreams);
- }
-
- public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
- return defaultValue(collect, inputStreams);
- }
-
- public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
- return defaultValue(correlate, inputStreams);
- }
-
- public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
- return defaultValue(delta, inputStreams);
- }
-
- public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
- return defaultValue(exchange, inputStreams);
- }
-
- public T visitProject(Project project, List<T> inputStreams) throws Exception {
- return defaultValue(project, inputStreams);
- }
-
- public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
- return defaultValue(filter, inputStreams);
- }
-
- public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
- return defaultValue(sample, inputStreams);
- }
-
- public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
- return defaultValue(sort, inputStreams);
- }
-
- public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
- return defaultValue(modify, inputStreams);
- }
-
- public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
- return defaultValue(scan, inputStreams);
- }
-
- public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
- return defaultValue(uncollect, inputStreams);
- }
-
- public T visitWindow(Window window, List<T> inputStreams) throws Exception {
- return defaultValue(window, inputStreams);
- }
-
- public T visitJoin(Join join, List<T> inputStreams) throws Exception {
- return defaultValue(join, inputStreams);
- }
-
- public T defaultValue(RelNode n, List<T> inputStreams) {
- return null;
- }
-}