You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:51 UTC

[05/20] storm git commit: [StormSQL] STORM-1149. Support pluggable data sources in CREATE TABLE.

[StormSQL] STORM-1149. Support pluggable data sources in CREATE TABLE.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0bb8e46c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0bb8e46c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0bb8e46c

Branch: refs/heads/master
Commit: 0bb8e46cea7fa162c1fd16203e1e415e13636fb1
Parents: 915f135
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 2 15:35:29 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/DataSourcesProvider.java   |  47 +++++++++
 .../apache/storm/sql/DataSourcesRegistry.java   |  66 ++++++++++++
 .../src/jvm/org/apache/storm/sql/StormSql.java  |  42 ++++++++
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 100 +++++++++++++++++++
 .../storm/sql/parser/ColumnDefinition.java      |  12 +++
 .../apache/storm/sql/parser/SqlCreateTable.java |  25 +++++
 .../apache/storm/sql/parser/StormParser.java    |   2 +-
 .../test/org/apache/storm/sql/TestStormSql.java |  73 ++++++++++++++
 8 files changed, 366 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
new file mode 100644
index 0000000..cc4874b
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public interface DataSourcesProvider {
+  /**
+   * @return the scheme of the data source
+   */
+  String scheme();
+
+  /**
+   * Construct a new data source.
+   * @param uri The URI that specifies the data source. The format of the URI
+   *            is fully customizable.
+   * @param inputFormatClass the name of the class that deserializes data.
+   *                         It is null when unspecified.
+   * @param outputFormatClass the name of the class that serializes data. It
+   *                          is null when unspecified.
+   * @param fields The name of the fields and the schema of the table.
+   */
+  DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<Map.Entry<String, Class<?>>> fields);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
new file mode 100644
index 0000000..a841609
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.sql.storm.DataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DataSourcesRegistry.class);
+  private static final Map<String, DataSourcesProvider> providers;
+
+  static {
+    providers = new HashMap<>();
+    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+        DataSourcesProvider.class);
+    for (DataSourcesProvider p : loader) {
+      LOG.info("Registering scheme {} with {}", p.scheme(), p);
+      providers.put(p.scheme(), p);
+    }
+  }
+
+  private DataSourcesRegistry() {
+  }
+
+  public static DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<Map.Entry<String, Class<?>>> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  @VisibleForTesting
+  static Map<String, DataSourcesProvider> providerMap() {
+    return providers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
new file mode 100644
index 0000000..477e633
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.util.Map;
+
+/**
+ * The StormSql class provides standalone, interactive interfaces to execute
+ * SQL statements over streaming data.
+ *
+ * The StormSql class is stateless. The user needs to submit the data
+ * definition language (DDL) statements and the query statements in the same
+ * batch.
+ */
+public abstract class StormSql {
+  public abstract void execute(Iterable<String> statements,
+      ChannelHandler handler) throws Exception;
+
+  public static StormSql construct() {
+    return new StormSqlImpl();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
new file mode 100644
index 0000000..136bc88
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.storm.sql.compiler.PlanCompiler;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
+
+class StormSqlImpl extends StormSql {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  private final SchemaPlus schema = Frameworks.createRootSchema(true);
+
+  @Override
+  public void execute(
+      Iterable<String> statements, ChannelHandler result)
+      throws Exception {
+    Map<String, DataSource> dataSources = new HashMap<>();
+    for (String sql : statements) {
+      StormParser parser = new StormParser(sql);
+      SqlNode node = parser.impl().parseSqlStmtEof();
+      if (node instanceof SqlCreateTable) {
+        handleCreateTable((SqlCreateTable) node, dataSources);
+      } else {
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+            schema).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        PlanCompiler compiler = new PlanCompiler(typeFactory);
+        AbstractValuesProcessor proc = compiler.compile(tree);
+        proc.initialize(dataSources, result);
+      }
+    }
+  }
+
+  private void handleCreateTable(
+      SqlCreateTable n, Map<String, DataSource> dataSources) {
+    TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
+    List<Map.Entry<String, Class<?>>> fields = new ArrayList<>();
+    for (ColumnDefinition col : n.fieldList()) {
+      builder.field(col.name(), col.type());
+      RelDataType dataType = col.type().deriveType(typeFactory);
+      Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+      fields.add(new AbstractMap.SimpleImmutableEntry<String, Class<?>>
+                     (col.name(), javaType));
+    }
+    Table table = builder.build();
+    schema.add(n.tableName(), table);
+    DataSource ds = DataSourcesRegistry.construct(n.location(), n
+        .inputFormatClass(), n.outputFormatClass(), fields);
+    if (ds == null) {
+      throw new RuntimeException("Cannot construct data source for " + n
+          .tableName());
+    } else if (dataSources.containsKey(n.tableName())) {
+      throw new RuntimeException("Duplicated definition for table " + n
+          .tableName());
+    }
+    dataSources.put(n.tableName(), ds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
index 27f2e57..3520b86 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
@@ -29,4 +29,16 @@ public class ColumnDefinition extends SqlNodeList {
       SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
     super(Arrays.asList(name, type, constraint), pos);
   }
+
+  public String name() {
+    return get(0).toString();
+  }
+
+  public SqlDataTypeSpec type() {
+    return (SqlDataTypeSpec) get(1);
+  }
+
+  public ColumnConstraint constraint() {
+    return (ColumnConstraint) get(2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index e81d146..8fe4160 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 
+import java.net.URI;
 import java.util.List;
 
 public class SqlCreateTable extends SqlCall {
@@ -102,4 +103,28 @@ public class SqlCreateTable extends SqlCall {
                                     outputFormatClass, location, properties,
                                     query);
   }
+
+  public String tableName() {
+    return tblName.toString();
+  }
+
+  public URI location() {
+    return URI.create(SqlLiteral.stringValue(location));
+  }
+
+  public String inputFormatClass() {
+    return inputFormatClass == null ? null : SqlLiteral.stringValue(
+        inputFormatClass);
+  }
+
+  public String outputFormatClass() {
+    return outputFormatClass == null ? null : SqlLiteral.stringValue
+        (outputFormatClass);
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<ColumnDefinition> fieldList() {
+    return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
index 9c74f28..670901e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -36,7 +36,7 @@ public class StormParser {
   }
 
   @VisibleForTesting
-  StormParserImpl impl() {
+  public StormParserImpl impl() {
     return impl;
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0bb8e46c/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
new file mode 100644
index 0000000..07b367f
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.compiler.TestUtils;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestStormSql {
+  private static class MockDataSourceProvider implements DataSourcesProvider {
+    @Override
+    public String scheme() {
+      return "mock";
+    }
+
+    @Override
+    public DataSource construct(
+        URI uri, String inputFormatClass, String outputFormatClass,
+        List<Map.Entry<String, Class<?>>> fields) {
+      return new TestUtils.MockDataSource();
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    DataSourcesRegistry.providerMap().remove("mock");
+  }
+
+  @Test
+  public void testExternalDataSource() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+    stmt.add("SELECT ID + 1 FROM FOO WHERE ID > 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals(4, values.get(0).get(0));
+    Assert.assertEquals(5, values.get(1).get(0));
+  }
+
+}