You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:51 UTC
[05/20] storm git commit: [StormSQL] STORM-1149. Support pluggable
data sources in CREATE TABLE.
[StormSQL] STORM-1149. Support pluggable data sources in CREATE TABLE.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0bb8e46c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0bb8e46c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0bb8e46c
Branch: refs/heads/master
Commit: 0bb8e46cea7fa162c1fd16203e1e415e13636fb1
Parents: 915f135
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 2 15:35:29 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800
----------------------------------------------------------------------
.../apache/storm/sql/DataSourcesProvider.java | 47 +++++++++
.../apache/storm/sql/DataSourcesRegistry.java | 66 ++++++++++++
.../src/jvm/org/apache/storm/sql/StormSql.java | 42 ++++++++
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 100 +++++++++++++++++++
.../storm/sql/parser/ColumnDefinition.java | 12 +++
.../apache/storm/sql/parser/SqlCreateTable.java | 25 +++++
.../apache/storm/sql/parser/StormParser.java | 2 +-
.../test/org/apache/storm/sql/TestStormSql.java | 73 ++++++++++++++
8 files changed, 366 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
new file mode 100644
index 0000000..cc4874b
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public interface DataSourcesProvider {
+ /**
+ * @return the scheme of the data source
+ */
+ String scheme();
+
+ /**
+ * Construct a new data source.
+ * @param uri The URI that specifies the data source. The format of the URI
+ * is fully customizable.
+ * @param inputFormatClass the name of the class that deserializes data.
+ * It is null when unspecified.
+ * @param outputFormatClass the name of the class that serializes data. It
+ * is null when unspecified.
+ * @param fields The name of the fields and the schema of the table.
+ */
+ DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<Map.Entry<String, Class<?>>> fields);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
new file mode 100644
index 0000000..a841609
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.sql.storm.DataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DataSourcesRegistry.class);
+ private static final Map<String, DataSourcesProvider> providers;
+
+ static {
+ providers = new HashMap<>();
+ ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+ DataSourcesProvider.class);
+ for (DataSourcesProvider p : loader) {
+ LOG.info("Registering scheme {} with {}", p.scheme(), p);
+ providers.put(p.scheme(), p);
+ }
+ }
+
+ private DataSourcesRegistry() {
+ }
+
+ public static DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<Map.Entry<String, Class<?>>> fields) {
+ DataSourcesProvider provider = providers.get(uri.getScheme());
+ if (provider == null) {
+ return null;
+ }
+
+ return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+ }
+
+ @VisibleForTesting
+ static Map<String, DataSourcesProvider> providerMap() {
+ return providers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
new file mode 100644
index 0000000..477e633
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.util.Map;
+
+/**
+ * The StormSql class provides standalone, interactive interfaces to execute
+ * SQL statements over streaming data.
+ *
+ * The StormSql class is stateless. The user needs to submit the data
+ * definition language (DDL) statements and the query statements in the same
+ * batch.
+ */
+public abstract class StormSql {
+ public abstract void execute(Iterable<String> statements,
+ ChannelHandler handler) throws Exception;
+
+ public static StormSql construct() {
+ return new StormSqlImpl();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
new file mode 100644
index 0000000..136bc88
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.storm.sql.compiler.PlanCompiler;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
+
+class StormSqlImpl extends StormSql {
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ private final SchemaPlus schema = Frameworks.createRootSchema(true);
+
+ @Override
+ public void execute(
+ Iterable<String> statements, ChannelHandler result)
+ throws Exception {
+ Map<String, DataSource> dataSources = new HashMap<>();
+ for (String sql : statements) {
+ StormParser parser = new StormParser(sql);
+ SqlNode node = parser.impl().parseSqlStmtEof();
+ if (node instanceof SqlCreateTable) {
+ handleCreateTable((SqlCreateTable) node, dataSources);
+ } else {
+ FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+ schema).build();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(tree);
+ proc.initialize(dataSources, result);
+ }
+ }
+ }
+
+ private void handleCreateTable(
+ SqlCreateTable n, Map<String, DataSource> dataSources) {
+ TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
+ List<Map.Entry<String, Class<?>>> fields = new ArrayList<>();
+ for (ColumnDefinition col : n.fieldList()) {
+ builder.field(col.name(), col.type());
+ RelDataType dataType = col.type().deriveType(typeFactory);
+ Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+ fields.add(new AbstractMap.SimpleImmutableEntry<String, Class<?>>
+ (col.name(), javaType));
+ }
+ Table table = builder.build();
+ schema.add(n.tableName(), table);
+ DataSource ds = DataSourcesRegistry.construct(n.location(), n
+ .inputFormatClass(), n.outputFormatClass(), fields);
+ if (ds == null) {
+ throw new RuntimeException("Cannot construct data source for " + n
+ .tableName());
+ } else if (dataSources.containsKey(n.tableName())) {
+ throw new RuntimeException("Duplicated definition for table " + n
+ .tableName());
+ }
+ dataSources.put(n.tableName(), ds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
index 27f2e57..3520b86 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
@@ -29,4 +29,16 @@ public class ColumnDefinition extends SqlNodeList {
SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
super(Arrays.asList(name, type, constraint), pos);
}
+
+ public String name() {
+ return get(0).toString();
+ }
+
+ public SqlDataTypeSpec type() {
+ return (SqlDataTypeSpec) get(1);
+ }
+
+ public ColumnConstraint constraint() {
+ return (ColumnConstraint) get(2);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index e81d146..8fe4160 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
+import java.net.URI;
import java.util.List;
public class SqlCreateTable extends SqlCall {
@@ -102,4 +103,28 @@ public class SqlCreateTable extends SqlCall {
outputFormatClass, location, properties,
query);
}
+
+ public String tableName() {
+ return tblName.toString();
+ }
+
+ public URI location() {
+ return URI.create(SqlLiteral.stringValue(location));
+ }
+
+ public String inputFormatClass() {
+ return inputFormatClass == null ? null : SqlLiteral.stringValue(
+ inputFormatClass);
+ }
+
+ public String outputFormatClass() {
+ return outputFormatClass == null ? null : SqlLiteral.stringValue
+ (outputFormatClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<ColumnDefinition> fieldList() {
+ return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
index 9c74f28..670901e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -36,7 +36,7 @@ public class StormParser {
}
@VisibleForTesting
- StormParserImpl impl() {
+ public StormParserImpl impl() {
return impl;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
new file mode 100644
index 0000000..07b367f
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.compiler.TestUtils;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestStormSql {
+ private static class MockDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mock";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<Map.Entry<String, Class<?>>> fields) {
+ return new TestUtils.MockDataSource();
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() {
+ DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ DataSourcesRegistry.providerMap().remove("mock");
+ }
+
+ @Test
+ public void testExternalDataSource() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("SELECT ID + 1 FROM FOO WHERE ID > 2");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(2, values.size());
+ Assert.assertEquals(4, values.get(0).get(0));
+ Assert.assertEquals(5, values.get(1).get(0));
+ }
+
+}