You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/09/09 14:50:41 UTC

[drill] branch master updated: DRILL-7984: Support clickhouse by JDBC plugin (#2290)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new dac5b7e  DRILL-7984: Support clickhouse by JDBC plugin (#2290)
dac5b7e is described below

commit dac5b7e5d8079753b51a7b513d9fbeb051dfe0d7
Author: leon <32...@qq.com>
AuthorDate: Thu Sep 9 22:50:34 2021 +0800

    DRILL-7984: Support clickhouse by JDBC plugin (#2290)
    
    * support clickhouse
    
    * add ClickhouseDialect
    
    * change jdbc writers
    
    * add license
    
    * add ClickhouseCatalogSchema
    
    * add JdbcSqlGenerator
    
    * add license header
    
    * remove static import
    
    * fix import problem
    
    * add JsonIgnore
    
    * refactor
    
    * add interface
    
    * add license
    
    * add ut
    
    * add pom
    
    * restart ci
    
    * update drill-calcite version
    
    * Fix testPhysicalPlanSubmission
---
 contrib/storage-jdbc/pom.xml                       |  19 ++
 .../exec/store/jdbc/CapitalizingJdbcSchema.java    |   5 +-
 .../drill/exec/store/jdbc/DefaultJdbcDialect.java  |  54 ++++
 .../drill/exec/store/jdbc/DrillJdbcConvention.java |   2 +-
 .../apache/drill/exec/store/jdbc/JdbcDialect.java  |  47 ++++
 .../JdbcIntWriter.java => JdbcDialectFactory.java} |  25 +-
 .../org/apache/drill/exec/store/jdbc/JdbcPrel.java |  24 +-
 .../drill/exec/store/jdbc/JdbcStoragePlugin.java   |  15 +-
 .../jdbc/clickhouse/ClickhouseCatalogSchema.java   | 107 ++++++++
 .../jdbc/clickhouse/ClickhouseJdbcDialect.java     |  56 ++++
 .../jdbc/clickhouse/ClickhouseJdbcImplementor.java |  46 ++++
 .../exec/store/jdbc/writers/JdbcBigintWriter.java  |   6 +-
 .../exec/store/jdbc/writers/JdbcBitWriter.java     |   5 +-
 .../exec/store/jdbc/writers/JdbcDoubleWriter.java  |   3 +-
 .../exec/store/jdbc/writers/JdbcFloatWriter.java   |   3 +-
 .../exec/store/jdbc/writers/JdbcIntWriter.java     |   3 +-
 .../store/jdbc/TestJdbcPluginWithClickhouse.java   | 301 +++++++++++++++++++++
 .../src/test/resources/clickhouse-test-data.sql    |  47 ++++
 pom.xml                                            |   2 +-
 19 files changed, 720 insertions(+), 50 deletions(-)

diff --git a/contrib/storage-jdbc/pom.xml b/contrib/storage-jdbc/pom.xml
index fdbe890..2103e8f 100755
--- a/contrib/storage-jdbc/pom.xml
+++ b/contrib/storage-jdbc/pom.xml
@@ -32,6 +32,7 @@
 
   <properties>
     <mysql.connector.version>8.0.25</mysql.connector.version>
+    <clickhouse.jdbc.version>0.3.1</clickhouse.jdbc.version>
     <h2.version>1.4.200</h2.version>
   </properties>
 
@@ -74,6 +75,24 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>ru.yandex.clickhouse</groupId>
+      <artifactId>clickhouse-jdbc</artifactId>
+      <version>${clickhouse.jdbc.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>commons-logging</artifactId>
+          <groupId>commons-logging</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>clickhouse</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
       <version>${h2.version}</version>
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index c4e500f..4ab3947 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -35,7 +35,7 @@ import org.apache.drill.exec.store.AbstractSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class CapitalizingJdbcSchema extends AbstractSchema {
+public class CapitalizingJdbcSchema extends AbstractSchema {
 
   private static final Logger logger = LoggerFactory.getLogger(CapitalizingJdbcSchema.class);
 
@@ -43,7 +43,8 @@ class CapitalizingJdbcSchema extends AbstractSchema {
   private final JdbcSchema inner;
   private final boolean caseSensitive;
 
-  CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource,
+  public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name,
+                          DataSource dataSource,
                          SqlDialect dialect, JdbcConvention convention, String catalog, String schema, boolean caseSensitive) {
     super(parentSchemaPath, name);
     this.schemaMap = new HashMap<>();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
new file mode 100644
index 0000000..64c0ce3
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SubsetRemover;
+
+public class DefaultJdbcDialect implements JdbcDialect {
+  private final JdbcStoragePlugin plugin;
+
+  public DefaultJdbcDialect(JdbcStoragePlugin plugin) {
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
+      JdbcCatalogSchema schema = new JdbcCatalogSchema(plugin.getName(),
+        plugin.getDataSource(), plugin.getDialect(), plugin.getConvention(),
+        !plugin.getConfig().areTableNamesCaseInsensitive());
+      SchemaPlus holder = parent.add(plugin.getName(), schema);
+      schema.setHolder(holder);
+  }
+
+  @Override
+  public String generateSql(RelOptCluster cluster, RelNode input) {
+    final SqlDialect dialect = plugin.getDialect();
+    final JdbcImplementor jdbcImplementor = new JdbcImplementor(dialect,
+        (JavaTypeFactory) cluster.getTypeFactory());
+    final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
+      input.accept(SubsetRemover.INSTANCE));
+    return result.asStatement().toSqlString(dialect).getSql();
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index a47a4f2..2534954 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -41,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 /**
  * Convention with set of rules to register for jdbc plugin
  */
-class DrillJdbcConvention extends JdbcConvention {
+public class DrillJdbcConvention extends JdbcConvention {
 
   /**
    * Unwanted Calcite's JdbcRules are filtered out using this set
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialect.java
new file mode 100644
index 0000000..ff54afd
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialect.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.SchemaConfig;
+
+/**
+ * Interface for different implementations of databases connected using the
+ * JdbcStoragePlugin.
+ */
+public interface JdbcDialect {
+
+  /**
+   * Register the schemas provided by this JdbcDialect implementation under the
+   * given parent schema.
+   *
+   * @param config Configuration for schema objects.
+   * @param parent Reference to parent schema.
+   */
+  void registerSchemas(SchemaConfig config, SchemaPlus parent);
+
+  /**
+   * Generate sql from relational expressions.
+   *
+   * @param cluster An environment for related relational expressions.
+   * @param input Relational expressions.
+   */
+  String generateSql(RelOptCluster cluster, RelNode input);
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
similarity index 60%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
copy to contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
index 5b4704c..9c10cf7 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
@@ -15,25 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.drill.exec.store.jdbc;
 
-package org.apache.drill.exec.store.jdbc.writers;
+import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
 
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+public class JdbcDialectFactory {
+  public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-public class JdbcIntWriter extends JdbcColumnWriter {
-
-  public JdbcIntWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
-    super(colName, rowWriter, columnIndex);
-  }
-
-  @Override
-  public void load(ResultSet results) throws SQLException {
-    if (!results.wasNull()) {
-      int value = results.getInt(columnIndex);
-      columnWriter.setInt(value);
+  public static JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, String url) {
+    if (url.startsWith(JDBC_CLICKHOUSE_PREFIX)) {
+      return new ClickhouseJdbcDialect(plugin);
+    } else {
+      return new DefaultJdbcDialect(plugin);
     }
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index 815e43f..487c8cc 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -17,13 +17,6 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import java.util.List;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.adapter.jdbc.JdbcImplementor;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -31,14 +24,17 @@ import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.sql.SqlDialect;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.store.SubsetRemover;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * Represents a JDBC Plan once the children nodes have been rewritten into SQL.
@@ -53,15 +49,7 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
     final RelNode input = prel.getInput();
     rows = input.estimateRowCount(cluster.getMetadataQuery());
     convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
-
-    // generate sql for tree.
-    final SqlDialect dialect = convention.getPlugin().getDialect();
-    final JdbcImplementor jdbcImplementor = new JdbcImplementor(
-        dialect,
-        (JavaTypeFactory) getCluster().getTypeFactory());
-    final JdbcImplementor.Result result =
-        jdbcImplementor.visitChild(0, input.accept(SubsetRemover.INSTANCE));
-    sql = result.asStatement().toSqlString(dialect).getSql();
+    sql = convention.getPlugin().getJdbcDialect().generateSql(getCluster(), input);
     rowType = input.getRowType();
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index f707f30..8a5cd41 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -47,6 +47,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
   private final HikariDataSource dataSource;
   private final SqlDialect dialect;
   private final DrillJdbcConvention convention;
+  private final JdbcDialect jdbcDialect;
 
   public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
     super(context, name);
@@ -54,14 +55,20 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
     this.dataSource = initDataSource(config);
     this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
     this.convention = new DrillJdbcConvention(dialect, name, this);
+    this.jdbcDialect = JdbcDialectFactory.getJdbcDialect(this, config.getUrl());
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    JdbcCatalogSchema schema = new JdbcCatalogSchema(getName(), dataSource, dialect, convention,
-        !this.config.areTableNamesCaseInsensitive());
-    SchemaPlus holder = parent.add(getName(), schema);
-    schema.setHolder(holder);
+    this.jdbcDialect.registerSchemas(config, parent);
+  }
+
+  public JdbcDialect getJdbcDialect() {
+    return jdbcDialect;
+  }
+
+  public DrillJdbcConvention getConvention() {
+    return convention;
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseCatalogSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseCatalogSchema.java
new file mode 100644
index 0000000..a1bcd42
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseCatalogSchema.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc.clickhouse;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.jdbc.CapitalizingJdbcSchema;
+import org.apache.drill.exec.store.jdbc.DrillJdbcConvention;
+import org.apache.drill.exec.store.jdbc.JdbcStorageConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ClickhouseCatalogSchema extends AbstractSchema {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClickhouseCatalogSchema.class);
+  private final Map<String, CapitalizingJdbcSchema> schemaMap;
+  private final CapitalizingJdbcSchema defaultSchema;
+
+  public ClickhouseCatalogSchema(String name, DataSource source, SqlDialect dialect, DrillJdbcConvention convention) {
+    super(Collections.emptyList(), name);
+    this.schemaMap = new HashMap<>();
+    String connectionSchemaName = null;
+    try (Connection con = source.getConnection();
+         ResultSet set = con.getMetaData().getSchemas()) {
+      connectionSchemaName = con.getSchema();
+      while (set.next()) {
+        final String schemaName = set.getString(1);
+        final String catalogName = set.getString(2);
+        schemaMap.put(schemaName, new CapitalizingJdbcSchema(getSchemaPath(), schemaName, source, dialect,
+          convention, catalogName, schemaName, false));
+      }
+    } catch (SQLException e) {
+      logger.error("Failure while attempting to load clickhouse schema.", e);
+    }
+    defaultSchema = determineDefaultSchema(connectionSchemaName);
+  }
+
+  private CapitalizingJdbcSchema determineDefaultSchema(String connectionSchemaName) {
+    CapitalizingJdbcSchema schema = schemaMap.get(connectionSchemaName);
+    if (schema == null) {
+      return schemaMap.values().iterator().next();
+    } else {
+      return schema;
+    }
+  }
+
+  public void setHolder(SchemaPlus plusOfThis) {
+    for (Map.Entry<String, CapitalizingJdbcSchema> entry : schemaMap.entrySet()) {
+      plusOfThis.add(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public String getTypeName() {
+    return JdbcStorageConfig.NAME;
+  }
+
+  @Override
+  public Schema getDefaultSchema() {
+    return defaultSchema;
+  }
+
+  @Override
+  public Table getTable(String name) {
+    if (defaultSchema != null) {
+      try {
+        return defaultSchema.getTable(name);
+      } catch (RuntimeException e) {
+        logger.warn("Failure while attempting to read table '{}' from {}.",
+          name, this.getClass().getSimpleName(), e);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return defaultSchema.getTableNames();
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
new file mode 100644
index 0000000..9251560
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc.clickhouse;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.jdbc.JdbcDialect;
+import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin;
+
+public class ClickhouseJdbcDialect implements JdbcDialect {
+
+  private final JdbcStoragePlugin plugin;
+
+  public ClickhouseJdbcDialect(JdbcStoragePlugin plugin) {
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
+    ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(plugin.getName(),
+      plugin.getDataSource(), plugin.getDialect(), plugin.getConvention());
+    SchemaPlus holder = parent.add(plugin.getName(), schema);
+    schema.setHolder(holder);
+  }
+
+  @Override
+  public String generateSql(RelOptCluster cluster, RelNode input) {
+    final SqlDialect dialect = plugin.getDialect();
+    final JdbcImplementor jdbcImplementor = new ClickhouseJdbcImplementor(dialect,
+      (JavaTypeFactory) cluster.getTypeFactory());
+    final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
+      input.accept(SubsetRemover.INSTANCE));
+    return result.asStatement().toSqlString(dialect).getSql();
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcImplementor.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcImplementor.java
new file mode 100644
index 0000000..c3af184
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcImplementor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc.clickhouse;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.adapter.jdbc.JdbcTableScan;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+
+import java.util.Iterator;
+
+public class ClickhouseJdbcImplementor extends JdbcImplementor {
+  public ClickhouseJdbcImplementor(SqlDialect dialect,
+                                   JavaTypeFactory typeFactory) {
+    super(dialect, typeFactory);
+  }
+
+  @Override
+  public Result visit(JdbcTableScan scan) {
+    SqlIdentifier sqlIdentifier = scan.jdbcTable.tableName();
+    Iterator<String> iter = sqlIdentifier.names.iterator();
+    Preconditions.checkArgument(sqlIdentifier.names.size() == 3,
+      "size of clickhouse table names:[%s] is not 3", sqlIdentifier.toString());
+    iter.next();
+    sqlIdentifier.setNames(ImmutableList.copyOf(iter), null);
+    return result(sqlIdentifier, ImmutableList.of(Clause.FROM), scan, null);
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
index 09b7d12..31c9367 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
@@ -31,9 +31,9 @@ public class JdbcBigintWriter extends JdbcColumnWriter {
 
   @Override
   public void load(ResultSet results) throws SQLException {
-    boolean b = results.wasNull();
-    if (! results.wasNull()) {
-      long value = results.getLong(columnIndex);
+    // JDBC reports nullability only after getting the column value.
+    long value = results.getLong(columnIndex);
+    if (!results.wasNull()) {
       columnWriter.setLong(value);
     }
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
index 29d688f..433c631 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
@@ -31,8 +31,9 @@ public class JdbcBitWriter extends JdbcColumnWriter {
 
   @Override
   public void load(ResultSet results) throws SQLException {
-    if (! results.wasNull()) {
-      boolean value = results.getBoolean(columnIndex);
+    // JDBC reports nullability only after getting the column value.
+    boolean value = results.getBoolean(columnIndex);
+    if (!results.wasNull()) {
       columnWriter.setBoolean(value);
     }
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
index 95d9402..8d23e29 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
@@ -31,8 +31,9 @@ public class JdbcDoubleWriter extends JdbcColumnWriter {
 
   @Override
   public void load(ResultSet results) throws SQLException {
+    // JDBC reports nullability only after getting the column value.
+    double value = results.getDouble(columnIndex);
     if (!results.wasNull()) {
-      double value = results.getDouble(columnIndex);
       columnWriter.setDouble(value);
     }
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
index fd2188d..52b12c5 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
@@ -31,8 +31,9 @@ public class JdbcFloatWriter extends JdbcColumnWriter {
 
   @Override
   public void load(ResultSet results) throws SQLException {
+    // JDBC reports nullability only after getting the column value.
+    float value = results.getFloat(columnIndex);
     if (!results.wasNull()) {
-      float value = results.getFloat(columnIndex);
       columnWriter.setFloat(value);
     }
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
index 5b4704c..4bfa29a 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
@@ -31,8 +31,9 @@ public class JdbcIntWriter extends JdbcColumnWriter {
 
   @Override
   public void load(ResultSet results) throws SQLException {
+    // JDBC reports nullability only after getting the column value.
+    int value = results.getInt(columnIndex);
     if (!results.wasNull()) {
-      int value = results.getInt(columnIndex);
       columnWriter.setInt(value);
     }
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
new file mode 100644
index 0000000..d2d697f
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.ClickHouseContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * JDBC storage plugin tests against Clickhouse.
+ */
+@Category(JdbcStorageTest.class)
+public class TestJdbcPluginWithClickhouse extends ClusterTest {
+
+  private static final String DOCKER_IMAGE_CLICKHOUSE = "yandex/clickhouse" +
+    "-server:21.8.4.51";
+  private static JdbcDatabaseContainer<?> jdbcContainer;
+
+  @BeforeClass
+  public static void initClickhouse() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    jdbcContainer =
+      new ClickHouseContainer(DockerImageName.parse(DOCKER_IMAGE_CLICKHOUSE))
+        .withInitScript("clickhouse-test-data.sql");
+    jdbcContainer.start();
+
+    JdbcStorageConfig jdbcStorageConfig =
+      new JdbcStorageConfig("ru.yandex.clickhouse.ClickHouseDriver",
+        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), null,
+        true, null, null);
+    jdbcStorageConfig.setEnabled(true);
+    cluster.defineStoragePlugin("clickhouse", jdbcStorageConfig);
+  }
+
+  @AfterClass
+  public static void stopClickhouse() {
+    if (jdbcContainer != null) {
+      jdbcContainer.stop();
+    }
+  }
+
+  @Test
+  public void validateResult() throws Exception {
+    testBuilder()
+        .sqlQuery(
+            "select person_id, first_name, last_name, address, city, state, zip, " +
+              "json, bigint_field, smallint_field, decimal_field, boolean_field, " +
+              "double_field, float_field, date_field, datetime_field, enum_field " +
+            "from clickhouse.`default`.person order by person_id")
+        .ordered()
+        .baselineColumns("person_id", "first_name", "last_name", "address",
+          "city", "state", "zip", "json", "bigint_field", "smallint_field",
+          "decimal_field", "boolean_field", "double_field", "float_field",
+          "date_field", "datetime_field", "enum_field")
+        .baselineValues(1, "first_name_1", "last_name_1", "1401 John F Kennedy Blvd",
+          "Philadelphia", "PA", 19107, "{ a : 5, b : 6 }", 123456789L, 1,
+          new BigDecimal("123.32"), 0, 1.0, 1.1,
+          DateUtility.parseLocalDate("2012-02-29"),
+          DateUtility.parseLocalDateTime("2012-02-29 13:00:01.0"), "XXX")
+        .baselineValues(2, "first_name_2", "last_name_2", "One Ferry Building",
+          "San Francisco", "CA", 94111, "{ z : [ 1, 2, 3 ] }", 45456767L, 3,
+          null, 1, 3.0, 3.1,
+          DateUtility.parseLocalDate("2011-10-30"),
+          DateUtility.parseLocalDateTime("2011-10-30 11:34:21.0"), "YYY")
+        .baselineValues(3, "first_name_3", "last_name_3", "176 Bowery",
+          "New York", "NY", 10012, "{ [ a, b, c ] }", 123090L, -3,
+          null, 0, 5.0, 5.1,
+          DateUtility.parseLocalDate("2015-06-01"),
+          DateUtility.parseLocalDateTime("2015-09-22 15:46:10.0"), "ZZZ")
+        .baselineValues(4, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, null, "XXX")
+        .go();
+  }
+
+  @Test
+  public void pushDownJoin() throws Exception {
+    String query = "select x.person_id from (select person_id from clickhouse.`default`.person) x "
+            + "join (select person_id from clickhouse.`default`.person) y on x.person_id = y.person_id";
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Join")
+        .match();
+  }
+
+  @Test
+  public void pushDownJoinAndFilterPushDown() throws Exception {
+    String query = "select * from " +
+            "clickhouse.`default`.person e " +
+            "INNER JOIN " +
+            "clickhouse.`default`.person s " +
+            "ON e.first_name = s.first_name " +
+            "WHERE e.last_name > 'hello'";
+
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Join", "Filter")
+        .match();
+  }
+
+  @Test
+  public void pushDownAggWithDecimal() throws Exception {
+    String query = "SELECT sum(decimal_field * smallint_field) AS `order_total`\n" +
+        "FROM clickhouse.`default`.person e";
+
+    DirectRowSet results = queryBuilder().sql(query).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("order_total", TypeProtos.MinorType.VARDECIMAL, 38, 2)
+        .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(123.32)
+        .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    String query = "select * from clickhouse.`default`.person";
+    String plan = queryBuilder().sql(query).explainJson();
+    assertEquals(4, queryBuilder().physical(plan).run().recordCount());
+  }
+
+  @Test
+  public void emptyOutput() {
+    String query = "select * from clickhouse.`default`.person e limit 0";
+
+    testBuilder()
+        .sqlQuery(query)
+        .expectsEmptyResultSet();
+  }
+
+  @Test
+  public void testExpressionsWithoutAlias() throws Exception {
+    String query = "select count(*), 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2\n" +
+        "from clickhouse.`default`.person";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2")
+        .baselineValues(4L, 88L, 1.618033988749895)
+        .go();
+  }
+
+  @Test
+  public void testExpressionsWithoutAliasesPermutations() throws Exception {
+    String query = "select EXPR$1, EXPR$0, EXPR$2\n" +
+        "from (select 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2, count(*) from clickhouse.`default`.person)";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("EXPR$1", "EXPR$0", "EXPR$2")
+        .baselineValues(1.618033988749895, 88L, 4L)
+        .go();
+  }
+
+  @Test
+  public void testExpressionsWithAliases() throws Exception {
+    String query = "select person_id as ID, 1+1+2+3+5+8+13+21+34 as FIBONACCI_SUM, (1+sqrt(5))/2 as golden_ratio\n" +
+        "from clickhouse.`default`.person limit 2";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("ID", "FIBONACCI_SUM", "golden_ratio")
+        .baselineValues(1, 88L, 1.618033988749895)
+        .baselineValues(2, 88L, 1.618033988749895)
+        .go();
+  }
+
+  @Test
+  public void testJoinStar() throws Exception {
+    String query = "select * from (select person_id from clickhouse.`default`.person) t1 join " +
+        "(select person_id from clickhouse.`default`.person) t2 on t1.person_id = t2.person_id";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("person_id", "person_id0")
+        .baselineValues(1, 1)
+        .baselineValues(2, 2)
+        .baselineValues(3, 3)
+        .baselineValues(4, 4)
+        .go();
+  }
+
+  @Test
+  public void testSemiJoin() throws Exception {
+    String query =
+        "select person_id from clickhouse.`default`.person t1\n" +
+            "where exists (" +
+                "select person_id from clickhouse.`default`.person\n" +
+                "where t1.person_id = person_id)";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("person_id")
+        .baselineValuesForSingleColumn(1, 2, 3, 4)
+        .go();
+  }
+
+  @Test
+  public void testInformationSchemaViews() throws Exception {
+    String query = "select * from information_schema.`views`";
+    run(query);
+  }
+
+  @Test
+  public void testJdbcTableTypes() throws Exception {
+    String query = "select distinct table_type from information_schema.`tables` " +
+        "where table_schema like 'clickhouse%'";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("table_type")
+        .baselineValuesForSingleColumn("TABLE", "VIEW")
+        .go();
+  }
+
+  @Test
+  public void testLimitPushDown() throws Exception {
+    String query = "select person_id, first_name, last_name from clickhouse.`default`.person limit 100";
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("Jdbc\\(.*LIMIT 100")
+        .exclude("Limit\\(")
+        .match();
+  }
+
+  @Test
+  public void testLimitPushDownWithOrderBy() throws Exception {
+    String query = "select person_id from clickhouse.`default`.person order by first_name limit 100";
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("Jdbc\\(.*ORDER BY `first_name`.*LIMIT 100")
+        .exclude("Limit\\(")
+        .match();
+  }
+
+  @Test
+  public void testLimitPushDownWithOffset() throws Exception {
+    String query = "select person_id, first_name from clickhouse.`default`.person limit 100 offset 10";
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("Jdbc\\(.*LIMIT 10, 100")
+        .exclude("Limit\\(")
+        .match();
+  }
+
+  @Test
+  public void testLimitPushDownWithConvertFromJson() throws Exception {
+    String query = "select convert_fromJSON(first_name)['ppid'] from clickhouse.`default`.person LIMIT 100";
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("Jdbc\\(.*LIMIT 100")
+        .exclude("Limit\\(")
+        .match();
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/resources/clickhouse-test-data.sql b/contrib/storage-jdbc/src/test/resources/clickhouse-test-data.sql
new file mode 100644
index 0000000..56c62b4
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/resources/clickhouse-test-data.sql
@@ -0,0 +1,47 @@
+create table person (
+  person_id       Int32,
+
+  first_name      Nullable(String),
+  last_name       Nullable(String),
+  address         Nullable(String),
+  city            Nullable(String),
+  state           Nullable(String),
+  zip             Nullable(Int32),
+
+  json            Nullable(String),
+
+  bigint_field    Nullable(Int64),
+  smallint_field  Nullable(Int16),
+  decimal_field   Nullable(DECIMAL(15, 2)),
+  boolean_field   Nullable(UInt8),
+  double_field    Nullable(Float64),
+  float_field     Nullable(Float32),
+
+  date_field      Nullable(Date),
+  datetime_field  Nullable(Datetime),
+  enum_field      Enum('XXX'=1, 'YYY'=2, 'ZZZ'=3)
+) ENGINE = MergeTree() order by person_id;
+
+insert into person (person_id, first_name, last_name, address, city, state, zip, json,
+                    bigint_field, smallint_field, decimal_field, boolean_field, double_field,
+                    float_field, date_field, datetime_field, enum_field)
+values (1, 'first_name_1', 'last_name_1', '1401 John F Kennedy Blvd', 'Philadelphia', 'PA',
+        19107, '{ a : 5, b : 6 }', 123456789, 1, 123.321, 0, 1.0, 1.1, '2012-02-29',
+        '2012-02-29 13:00:01', 'XXX');
+
+insert into person (person_id, first_name, last_name, address, city, state, zip, json,
+                    bigint_field, smallint_field, boolean_field, double_field,
+                    float_field, date_field, datetime_field, enum_field)
+values (2, 'first_name_2', 'last_name_2', 'One Ferry Building', 'San Francisco', 'CA', 94111,
+        '{ z : [ 1, 2, 3 ] }', 45456767, 3, 1, 3.0, 3.1, '2011-10-30',
+        '2011-10-30 11:34:21', 'YYY');
+
+insert into person (person_id, first_name, last_name, address, city, state, zip, json,
+                    bigint_field, smallint_field, boolean_field, double_field,
+                    float_field, date_field, datetime_field, enum_field)
+values (3, 'first_name_3', 'last_name_3', '176 Bowery', 'New York', 'NY', 10012, '{ [ a, b, c ] }',
+        123090, -3, 0, 5.0, 5.1, '2015-06-01', '2015-09-22 15:46:10', 'ZZZ');
+
+insert into person (person_id) values (4);
+
+create view person_view as select * from person;
diff --git a/pom.xml b/pom.xml
index 9894041..ed2968b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
       avoid_bad_dependencies plugin found in the file.
     -->
     <calcite.groupId>com.github.vvysotskyi.drill-calcite</calcite.groupId>
-    <calcite.version>1.21.0-drill-r3</calcite.version>
+    <calcite.version>1.21.0-drill-r4</calcite.version>
     <avatica.version>1.17.0</avatica.version>
     <janino.version>3.0.11</janino.version>
     <sqlline.version>1.9.0</sqlline.version>