You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/10/06 22:19:42 UTC

[GitHub] [drill] cgivre opened a new pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

cgivre opened a new pull request #2327:
URL: https://github.com/apache/drill/pull/2327


   # [DRILL-8005](https://issues.apache.org/jira/browse/DRILL-8005): Add Writer to JDBC Storage Plugin
   
   ## Description
   This PR adds the ability to write to JDBC storage.  Users will be able to execute the following queries against JDBC data sources. 
   
   - `CREATE TABLE AS` 
   - `CREATE TABLE IF NOT EXISTS`
   - `DROP TABLE` 
   
   ## Example Queries:
   
   ```sql
   CREATE TABLE IF NOT EXISTS pg.public.`t1` AS 
     SELECT int_field, float_field, varchar_field, boolean_field 
     FROM cp.`json/dataTypes.json`
   ```
   
   ### Known Limitations:
   * JDBC in general does not support complex types, and current implementation of this plugin will throw an exception if a user tries to write a complex field to a JDBC source. 
   * This PR attempts to be as generic as possible and as such, the translation between Drill data types and JDBC data types isn't always the same.  Specifically, various databases use different types for INT, FLOAT etc.  The plugin will default back to `NUMERIC` for most `FLOAT` types.
   * `VARBINARY` is not supported yet. 
   * Write capability was tested on MySQL, Postgres and H2.  Given the lack of standardization of DDL queries, there may be bugs when trying to write to other JDBC data sources.
   
   
   ## Documentation
   Documentation will be provided in a separate pull request.
   
   ## Testing
   This PR adds unit tests for the writer for MySQL, Postgres, and H2.   Additionally, this PR adds additional unit tests for the JDBC storage plugin and Postgres. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r726052225



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       @cgivre a little trick for producing 1e6 records
   ```
   set `planner.enable_nljoin_for_scalar_only` = false;
   create temporary table t as select o1.* from cp.`tpch/orders.parquet` o1 cross join cp.`tpch/orders.parquet` o2 limit 1e6;
   ```

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       @cgivre I think generating a test file of 1m records is a good thing to do at least once.  I don't know much about Drill's batching but I think of it as unrelated to the size limitations of VALUES expressions in external dbs.  If it were me I'd assume Drill could send batches bigger than the target db's VALUES limit and I'd write a loop in JdbcRecordWriter that inserts no more than ~500 records at a time, as outlined in my first comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-937326064


   This pull request **introduces 2 alerts** when merging 57d3fa6c62e0757a7b83376847113bd154b66608 into bad5e669d5241b56d34560e93bd93e670785e388 - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-564703efee05ee310aba77cb75edf59613ccc46d)
   
   **new alerts:**
   
   * 2 for Potential database resource leak


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725244859



##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * JDBC storage plugin tests against Postgres.
+ */
+@Category(JdbcStorageTest.class)
+public class TestJdbcPluginWithPostgres extends ClusterTest {

Review comment:
       Correct.  This probably should have been a separate PR.  I wanted to test the writer on a few different databases so I figured it made sense to duplicate the reader unit tests.   
   
   I'd like to add similar tests for MSSQL and Oracle, but getting the testcontainers to run was difficult.  I may do this later in a separate pull request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725565805



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r726032662



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       I'm still learning about the writer API myself, so I'm figuring this out as we go, but I'm also not quite sure where you control the batch size.  I can see if I can figure that out. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre merged pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre merged pull request #2327:
URL: https://github.com/apache/drill/pull/2327


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730326926



##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
##########
@@ -210,6 +216,40 @@ public void testWithLargeFile() throws Exception {
     assertTrue(dropResults.succeeded());
   }
 
+  @Test
+  @Ignore("This is a slow test.  Please run manually.")
+  public void testWithReallyLongFile() throws Exception {
+    Path generatedFile = null;
+    try {
+      generatedFile = JdbcTestUtils.generateCsvFile("csv/very_large_file.csvh", 10, 100000);

Review comment:
       @luocooong Thanks for the comment, but if you look a little further, you'll see that we aren't actually storing a large file. There is a method which generates a csv file of a given size for the tests.  Once the tests are done, it deletes the files. There is a `large_csv` file, however it is only 300k, so .... we could get rid of that if you really want to.  
   
   For reference, Postgres has a limit of 1000 rows per insert, so I wanted to see if we'd break the writer with an insert of more than 1000 rows into Postgres.  The good news is that it didn't break!

##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
##########
@@ -370,22 +372,73 @@ public void testWithArrayField() throws Exception {
     }
   }
 
+  @Test
+  public void testWithLargeFile() throws Exception {
+    String query = "CREATE TABLE h2.tmp.`drill_h2_test`.`t2` (id,first_name,last_name,email,gender,ip_address) AS " +
+      "SELECT id,first_name,last_name,email,gender,ip_address FROM cp.`csv/large_csv.csvh`";
+    QuerySummary insertResults = queryBuilder().sql(query).run();
+    assertTrue(insertResults.succeeded());
+
+    query = "SELECT COUNT(*) FROM h2.tmp.`drill_h2_test`.`t2`";
+    long rowCount = queryBuilder().sql(query).singletonLong();
+    assertEquals(6000, rowCount);
+
+    // Now drop the table
+    String dropQuery = "DROP TABLE h2.tmp.`drill_h2_test`.`t2`";
+    QuerySummary dropResults = queryBuilder().sql(dropQuery).run();
+    assertTrue(dropResults.succeeded());
+  }
+
+  @Test
+  @Ignore("This is a slow test.  Please run manually.")

Review comment:
       YW!

##########
File path: docs/dev/CreatingAWriter.md
##########
@@ -0,0 +1,2 @@
+# Creating a Writer for a Storage Plugin
+This tutorial explains the mostly undocumented features of how to create a writer for a Drill storage plugin.  

Review comment:
       Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-939585965


   @dzamo, @vvysotskyi 
   Thank you for your timely review on this.  I addressed all your comments.  Once the drill-calcite PR is merged, I should be able to remove the hack and (hopefully) it should be ready to go at that point. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-937326064






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725234809



##########
File path: .gitignore
##########
@@ -27,3 +27,4 @@ exec/jdbc-all/dependency-reduced-pom.xml
 .*.html
 venv/
 tools/venv/
+contrib/storage-jdbc/src/test/resources/logback-test.xml

Review comment:
       Removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r734153115



##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-server</artifactId>
+      <version>1.21.0-drill-r5</version>

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-937369004


   This pull request **introduces 2 alerts** when merging af78c5adffa507cc347c5af459831b5a49bbd3c4 into bad5e669d5241b56d34560e93bd93e670785e388 - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-af9c91b67b3e27ef095bb154efcac94f3f2ec6ff)
   
   **new alerts:**
   
   * 2 for Potential database resource leak


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725724986



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
##########
@@ -81,7 +80,12 @@ public boolean supportsRead() {
     return true;
   }
 
-  public DataSource getDataSource() {
+  @Override
+  public boolean supportsWrite() {
+    return config.isWritable();
+  }
+
+  public HikariDataSource getDataSource() {

Review comment:
       Nope..  Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r732301551



##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-949560720


   Thank you @dzamo @vvysotskyi and @luocooong for your review! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r732145508



##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>
+      <artifactId>calcite-server</artifactId>

Review comment:
       Calcite-core is included in the root `pom.xml` file.  The DDR operations which we needed for this are in the `calcite-server` module which was not listed as a dependency in the root `pom.xml`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724060422



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+  private static final Logger logger = LoggerFactory.getLogger(JdbcQueryBuilder.class);
+  public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+  private final StringBuilder createTableQuery;
+  private SqlDialect dialect;
+  private StringBuilder columns;
+
+  public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+    if (Strings.isNullOrEmpty(tableName)) {
+      throw new UnsupportedOperationException("Table name cannot be empty");
+    }
+    this.dialect = dialect;
+    createTableQuery = new StringBuilder();
+    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+    columns = new StringBuilder();
+  }
+
+  /**
+   * Adds a column to the CREATE TABLE statement
+   * @param colName The column to be added to the table
+   * @param type The Drill MinorType of the column
+   * @param nullable If the column is nullable or not.
+   * @param precision The precision, or overall length of a column
+   * @param scale The scale, or number of digits after the decimal
+   */
+  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
+    StringBuilder queryText = new StringBuilder();
+    String jdbcColType = "";
+    try {
+      jdbcColType = JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+    } catch (NullPointerException e) {
+      // JDBC Does not support writing complex fields to databases
+      throw UserException.dataWriteError()
+        .message("Drill does not support writing complex fields to JDBC data sources.")
+        .addContext(colName + " is a complex type.")
+        .build(logger);
+    }
+
+    queryText.append(colName).append(" ").append(jdbcColType);
+
+    // Add precision or scale if applicable
+    if (jdbcColType.equals("VARCHAR")) {
+      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+      queryText.append("(").append(max_precision).append(")");
+    }
+
+    if (!nullable) {
+      queryText.append(" NOT NULL");
+    }
+
+    if (! Strings.isNullOrEmpty(columns.toString())) {
+      columns.append(",\n");
+    }
+
+    columns.append(queryText);
+  }
+
+  /**
+   * Generates the CREATE TABLE query.
+   * @return The create table query.
+   */
+  public String getCreateTableQuery() {
+    createTableQuery.append(columns);
+    createTableQuery.append("\n)");
+    return createTableQuery.toString();
+  }
+
+  @Override
+  public String toString() {
+    return getCreateTableQuery();
+  }
+
+  /**
+   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
+   * @param table The table
+   * @param catalog The database catalog
+   * @param schema The database schema
+   * @return The table with catalog and schema added, if present
+   */
+  public static String buildCompleteTableName(String table, String catalog, String schema) {

Review comment:
       Quite often RDBMSes allow spaces, and possibly other tricky characters, to be used in table names.  Trouble is that they differ in how they want want such identifiers enclosed in those cases e.g. `[spaced out table]` vs ```spaced out table```.  Do we want to raise an error or warning here if e.g. a regexp sees some of these characters?  Otherwise I think later `INSERT` statements could break.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();

Review comment:
       Do we need a `finally { conn.close() }` for this?

##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * JDBC storage plugin tests against Postgres.
+ */
+@Category(JdbcStorageTest.class)
+public class TestJdbcPluginWithPostgres extends ClusterTest {

Review comment:
       The contents of this class look like they're not writer-related and should have been there before...  was it just something we didn't have?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcCatalogSchema.java
##########
@@ -56,7 +56,7 @@
       while (set.next()) {
         final String catalogName = set.getString(1);
         CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
-            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive);
+            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive, convention.getPlugin());

Review comment:
       If CapitalizingJdbcSchema can only correctly use the JdbcStoragePlugin returned by `convention.getPlugin()` then I'd consider not adding to the constructor args here, but letting CapitalizingJdbcSchema do the lookup `convention.getPlugin()` itself, in the constructor.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcDDLQueryUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
+  /**
+   * Converts a given SQL query from the generic dialect to the destination system dialect.  Returns
+   * null if the original query is not valid.
+   *
+   * @param query An ANSI SQL statement
+   * @param dialect The destination system dialect
+   * @return A representation of the original query in the destination dialect
+   */
+  public static String cleanDDLQuery(String query, SqlDialect dialect) {
+    SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
+      .setParserFactory(SqlDdlParserImpl.FACTORY)
+      .setConformance(SqlConformanceEnum.MYSQL_5)
+      .setCaseSensitive(true)
+      .setLex(Lex.MYSQL_ANSI)
+      .build();
+
+    try {
+      SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
+      String cleanSQL =  node.toSqlString(dialect).getSql();
+
+      // TODO Fix this hack
+      // HACK  See CALCITE-4820 (https://issues.apache.org/jira/browse/CALCITE-4820)

Review comment:
       Did you see the response from the Calcite team on CALCITE-4820?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {
+    StringBuilder values = new StringBuilder();
+    for (int i = 0; i < insertRows.size(); i++) {
+      if (i > 0) {
+        values.append(",\n");
+      }
+      values.append(insertRows.get(i));
+    }
+
+    String sql = String.format(INSERT_QUERY_TEMPLATE, tableName, values);
+    return JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+  }
+
+  private String formatDateForInsertQuery(Long dateVal) {
+    Date date=new Date(dateVal);
+    SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
+    return df2.format(date);
+  }
+
+  private String formatTimeForInsertQuery(Integer millis) {
+    return String.format("%02d:%02d:%02d", TimeUnit.MILLISECONDS.toHours(millis),
+      TimeUnit.MILLISECONDS.toMinutes(millis) % TimeUnit.HOURS.toMinutes(1),
+      TimeUnit.MILLISECONDS.toSeconds(millis) % TimeUnit.MINUTES.toSeconds(1));
+  }
+
+  private String formatTimeStampForInsertQuery(Long time) {
+    Date date = new Date(time);
+    Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    return format.format(date);
+  }
+
+  @Override
+  public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableIntJDBCConverter(fieldId, fieldName, reader, fields);
+  }
+
+  public class NullableIntJDBCConverter extends FieldConverter {

Review comment:
       I guess maybe we could do something with a Freemarker template for the converters but I'm not convinced it's worth it now that we already have these written.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {

Review comment:
       It feels weird that we do the actual inserting in a method called `cleanup` - is this the right place?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       I think that the maximum number of records DBMSes allow in a `VALUES` expression is order 1e3 to 1e4.  If Drill batch sizes can exceed that we're going to have a problem.  A possible solution is to always partition into conservative insert batches of, say 500 records.  The `PreparedStatement` and `executeBatch` JDBC API usage in this answer https://stackoverflow.com/a/3786127/1153953 might help to keep things as efficient as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725064774



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
##########
@@ -81,7 +80,12 @@ public boolean supportsRead() {
     return true;
   }
 
-  public DataSource getDataSource() {
+  @Override
+  public boolean supportsWrite() {
+    return config.isWritable();
+  }
+
+  public HikariDataSource getDataSource() {

Review comment:
       Is there any reason for returning `HikariDataSource` here instead of `DataSource`? We should not depend on the specific implementation.

##########
File path: docs/dev/CreatingAWriter.md
##########
@@ -0,0 +1,2 @@
+# Creating a Writer for a Storage Plugin
+This tutorial explains the mostly undocumented features of how to create a writer for a Drill storage plugin.  

Review comment:
       Looks like it should be added.

##########
File path: .gitignore
##########
@@ -27,3 +27,4 @@ exec/jdbc-all/dependency-reduced-pom.xml
 .*.html
 venv/
 tools/venv/
+contrib/storage-jdbc/src/test/resources/logback-test.xml

Review comment:
       Please remove this line, it is not needed.

##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -30,10 +30,18 @@
 
   <name>Drill : Contrib : Storage : JDBC</name>
 
+  <repositories>
+    <repository>
+      <id>jitpack.io</id>
+      <url>https://jitpack.io</url>
+    </repository>
+  </repositories>
+

Review comment:
       It is declared in root pom, so no need to declare it here too.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();
+      Statement stmt = conn.createStatement();

Review comment:
       Please wrap it with a try-with-resources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725875035



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       @cgivre did you see this?  Have we tested CTAS statements with 10k, 100k, 1m records?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-944664618


   > @cgivre Hello Charles. Is it possible to compress the `large_csv.csvh` to zip or tar.gz format ?
   
   Hi @luocooong Are you looking for a test to see if it can go from compressed file to insert?  I added a new test that generates a 100k row CSV file and inserts that.  The only thing is that the test is slow, so I disabled it by default. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r732304828



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try (Statement statement = connection.createStatement()) {
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);

Review comment:
       Fixed I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725235203



##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -30,10 +30,18 @@
 
   <name>Drill : Contrib : Storage : JDBC</name>
 
+  <repositories>
+    <repository>
+      <id>jitpack.io</id>
+      <url>https://jitpack.io</url>
+    </repository>
+  </repositories>
+

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730345453



##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
##########
@@ -210,6 +216,40 @@ public void testWithLargeFile() throws Exception {
     assertTrue(dropResults.succeeded());
   }
 
+  @Test
+  @Ignore("This is a slow test.  Please run manually.")
+  public void testWithReallyLongFile() throws Exception {
+    Path generatedFile = null;
+    try {
+      generatedFile = JdbcTestUtils.generateCsvFile("csv/very_large_file.csvh", 10, 100000);

Review comment:
       Thanks, my issue is gone.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725236660



##########
File path: docs/dev/CreatingAWriter.md
##########
@@ -0,0 +1,2 @@
+# Creating a Writer for a Storage Plugin
+This tutorial explains the mostly undocumented features of how to create a writer for a Drill storage plugin.  

Review comment:
       I'll write up docs for this once the PR is basically ready to go. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725115192



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcDDLQueryUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
+  /**
+   * Converts a given SQL query from the generic dialect to the destination system dialect.  Returns
+   * null if the original query is not valid.
+   *
+   * @param query An ANSI SQL statement
+   * @param dialect The destination system dialect
+   * @return A representation of the original query in the destination dialect
+   */
+  public static String cleanDDLQuery(String query, SqlDialect dialect) {
+    SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
+      .setParserFactory(SqlDdlParserImpl.FACTORY)
+      .setConformance(SqlConformanceEnum.MYSQL_5)
+      .setCaseSensitive(true)
+      .setLex(Lex.MYSQL_ANSI)
+      .build();
+
+    try {
+      SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
+      String cleanSQL =  node.toSqlString(dialect).getSql();
+
+      // TODO Fix this hack
+      // HACK  See CALCITE-4820 (https://issues.apache.org/jira/browse/CALCITE-4820)

Review comment:
       I think yes, it is a quite small fix, so no conflicts should appear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725724686



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+  private static final Logger logger = LoggerFactory.getLogger(JdbcQueryBuilder.class);
+  public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+  private final StringBuilder createTableQuery;
+  private SqlDialect dialect;
+  private StringBuilder columns;
+
+  public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+    if (Strings.isNullOrEmpty(tableName)) {
+      throw new UnsupportedOperationException("Table name cannot be empty");
+    }
+    this.dialect = dialect;
+    createTableQuery = new StringBuilder();
+    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+    columns = new StringBuilder();
+  }
+
+  /**
+   * Adds a column to the CREATE TABLE statement
+   * @param colName The column to be added to the table
+   * @param type The Drill MinorType of the column
+   * @param nullable If the column is nullable or not.
+   * @param precision The precision, or overall length of a column
+   * @param scale The scale, or number of digits after the decimal
+   */
+  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
+    StringBuilder queryText = new StringBuilder();
+    String jdbcColType = "";
+    try {
+      jdbcColType = JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+    } catch (NullPointerException e) {
+      // JDBC Does not support writing complex fields to databases
+      throw UserException.dataWriteError()
+        .message("Drill does not support writing complex fields to JDBC data sources.")
+        .addContext(colName + " is a complex type.")
+        .build(logger);
+    }
+
+    queryText.append(colName).append(" ").append(jdbcColType);
+
+    // Add precision or scale if applicable
+    if (jdbcColType.equals("VARCHAR")) {
+      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+      queryText.append("(").append(max_precision).append(")");
+    }
+
+    if (!nullable) {
+      queryText.append(" NOT NULL");
+    }
+
+    if (! Strings.isNullOrEmpty(columns.toString())) {
+      columns.append(",\n");
+    }
+
+    columns.append(queryText);
+  }
+
+  /**
+   * Generates the CREATE TABLE query.
+   * @return The create table query.
+   */
+  public String getCreateTableQuery() {
+    createTableQuery.append(columns);
+    createTableQuery.append("\n)");
+    return createTableQuery.toString();
+  }
+
+  @Override
+  public String toString() {
+    return getCreateTableQuery();
+  }
+
+  /**
+   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
+   * @param table The table
+   * @param catalog The database catalog
+   * @param schema The database schema
+   * @return The table with catalog and schema added, if present
+   */
+  public static String buildCompleteTableName(String table, String catalog, String schema) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-948205950


   @dzamo @vvysotskyi 
   Thank you for your patience.  I've removed the hackery and addressed your review comments. 
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725566154



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();
+      Statement stmt = conn.createStatement();

Review comment:
       Fixed.  Here and elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725119233



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+  private static final Logger logger = LoggerFactory.getLogger(JdbcQueryBuilder.class);
+  public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+  private final StringBuilder createTableQuery;
+  private SqlDialect dialect;
+  private StringBuilder columns;
+
+  public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+    if (Strings.isNullOrEmpty(tableName)) {
+      throw new UnsupportedOperationException("Table name cannot be empty");
+    }
+    this.dialect = dialect;
+    createTableQuery = new StringBuilder();
+    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+    columns = new StringBuilder();
+  }
+
+  /**
+   * Adds a column to the CREATE TABLE statement
+   * @param colName The column to be added to the table
+   * @param type The Drill MinorType of the column
+   * @param nullable If the column is nullable or not.
+   * @param precision The precision, or overall length of a column
+   * @param scale The scale, or number of digits after the decimal
+   */
+  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
+    StringBuilder queryText = new StringBuilder();
+    String jdbcColType = "";
+    try {
+      jdbcColType = JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+    } catch (NullPointerException e) {
+      // JDBC Does not support writing complex fields to databases
+      throw UserException.dataWriteError()
+        .message("Drill does not support writing complex fields to JDBC data sources.")
+        .addContext(colName + " is a complex type.")
+        .build(logger);
+    }
+
+    queryText.append(colName).append(" ").append(jdbcColType);
+
+    // Add precision or scale if applicable
+    if (jdbcColType.equals("VARCHAR")) {
+      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+      queryText.append("(").append(max_precision).append(")");
+    }
+
+    if (!nullable) {
+      queryText.append(" NOT NULL");
+    }
+
+    if (! Strings.isNullOrEmpty(columns.toString())) {
+      columns.append(",\n");
+    }
+
+    columns.append(queryText);
+  }
+
+  /**
+   * Generates the CREATE TABLE query.
+   * @return The create table query.
+   */
+  public String getCreateTableQuery() {
+    createTableQuery.append(columns);
+    createTableQuery.append("\n)");
+    return createTableQuery.toString();
+  }
+
+  @Override
+  public String toString() {
+    return getCreateTableQuery();
+  }
+
+  /**
+   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
+   * @param table The table
+   * @param catalog The database catalog
+   * @param schema The database schema
+   * @return The table with catalog and schema added, if present
+   */
+  public static String buildCompleteTableName(String table, String catalog, String schema) {

Review comment:
       Thanks for the comment.  I'll add a unit test for this use case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724060422



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+  private static final Logger logger = LoggerFactory.getLogger(JdbcQueryBuilder.class);
+  public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+  private final StringBuilder createTableQuery;
+  private SqlDialect dialect;
+  private StringBuilder columns;
+
+  public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+    if (Strings.isNullOrEmpty(tableName)) {
+      throw new UnsupportedOperationException("Table name cannot be empty");
+    }
+    this.dialect = dialect;
+    createTableQuery = new StringBuilder();
+    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+    columns = new StringBuilder();
+  }
+
+  /**
+   * Adds a column to the CREATE TABLE statement
+   * @param colName The column to be added to the table
+   * @param type The Drill MinorType of the column
+   * @param nullable If the column is nullable or not.
+   * @param precision The precision, or overall length of a column
+   * @param scale The scale, or number of digits after the decimal
+   */
+  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
+    StringBuilder queryText = new StringBuilder();
+    String jdbcColType = "";
+    try {
+      jdbcColType = JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+    } catch (NullPointerException e) {
+      // JDBC Does not support writing complex fields to databases
+      throw UserException.dataWriteError()
+        .message("Drill does not support writing complex fields to JDBC data sources.")
+        .addContext(colName + " is a complex type.")
+        .build(logger);
+    }
+
+    queryText.append(colName).append(" ").append(jdbcColType);
+
+    // Add precision or scale if applicable
+    if (jdbcColType.equals("VARCHAR")) {
+      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+      queryText.append("(").append(max_precision).append(")");
+    }
+
+    if (!nullable) {
+      queryText.append(" NOT NULL");
+    }
+
+    if (! Strings.isNullOrEmpty(columns.toString())) {
+      columns.append(",\n");
+    }
+
+    columns.append(queryText);
+  }
+
+  /**
+   * Generates the CREATE TABLE query.
+   * @return The create table query.
+   */
+  public String getCreateTableQuery() {
+    createTableQuery.append(columns);
+    createTableQuery.append("\n)");
+    return createTableQuery.toString();
+  }
+
+  @Override
+  public String toString() {
+    return getCreateTableQuery();
+  }
+
+  /**
+   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
+   * @param table The table
+   * @param catalog The database catalog
+   * @param schema The database schema
+   * @return The table with catalog and schema added, if present
+   */
+  public static String buildCompleteTableName(String table, String catalog, String schema) {

Review comment:
       Quite often RDBMSes allow spaces, and possibly other tricky characters, to be used in table names.  Trouble is that they differ in how they want want such identifiers enclosed in those cases e.g. `[spaced out table]` vs ````spaced out table````.  Do we want to raise an error or warning here if e.g. a regexp sees some of these characters?  Otherwise I think later `INSERT` statements could break.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r733973925



##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-server</artifactId>
+      <version>1.21.0-drill-r5</version>

Review comment:
       Please use property from root pom instead of hardcoding the version...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-949248528


   @dzamo 
   Per your request, I thought about this some more and added the ability to configure the batch size for the `INSERT` queries.  What happens now is that the user can set the batch size depending on their environment and the database to which they are inserting data.  
   
   The unit tests pass and I ran this locally with a 1M row CSV insert into a MySQL database which worked perfectly.  Previously, this ran into the `max_packet_size` limit in MySQL, but now this is not an issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-949391299


   > @dzamo Per your request, I thought about this some more and added the ability to configure the batch size for the `INSERT` queries. What happens now is that the user can set the batch size depending on their environment and the database to which they are inserting data.
   
   @cgivre this is great.   I thought of one more possible optimisation: creating a parameterised INSERT PreparedStatement of writer_batch_size rows and reusing it for as long as there are >= writer_batch_size rows remaining to insert.  I don't know Calcite stuff but I can say I saw a class called SqlDynamicParam in it.  This would mean that the receiving DBMS does not need to parse a very long INSERT statement at the start of every batch, a noticeable saving of memory and CPU time for it I would guess.  Just a possible optimisation I wanted to share, I view it as something that can also come in a later version.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725724741



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();

Review comment:
       Addressed in try-with-resources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725724928



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {

Review comment:
       Yeah...the methods really make more sense if you view them as being written for a file-based system.  The `cleanup` function is basically what gets executed at the end. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-943275706


   Or we can use a cross join in a query based on a small CSV file of n rows to get n² rows. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2327:
URL: https://github.com/apache/drill/pull/2327#issuecomment-943230384


   @cgivre Hello Charles. Is it possible to compress the `large_csv.csvh` to zip or tar.gz format ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r725110144



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcDDLQueryUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
+  /**
+   * Converts a given SQL query from the generic dialect to the destination system dialect.  Returns
+   * null if the original query is not valid.
+   *
+   * @param query An ANSI SQL statement
+   * @param dialect The destination system dialect
+   * @return A representation of the original query in the destination dialect
+   */
+  public static String cleanDDLQuery(String query, SqlDialect dialect) {
+    SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
+      .setParserFactory(SqlDdlParserImpl.FACTORY)
+      .setConformance(SqlConformanceEnum.MYSQL_5)
+      .setCaseSensitive(true)
+      .setLex(Lex.MYSQL_ANSI)
+      .build();
+
+    try {
+      SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
+      String cleanSQL =  node.toSqlString(dialect).getSql();
+
+      // TODO Fix this hack
+      // HACK  See CALCITE-4820 (https://issues.apache.org/jira/browse/CALCITE-4820)

Review comment:
       I didn't see the response.  
   @vvysotskyi would it be possible to merge https://github.com/apache/calcite/pull/1568 to the Drill calcite?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r732144502



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)

Review comment:
       Not at this time.  That's noted in the docs.  My reason was that most JDBC databases don't support those types.  I was thinking that maybe we could convert lists and maps into strings.  Maybe if there is interest.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724922193



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       I think that the maximum number of records DBMSes allow in a `VALUES` expression is commonly order 1e3 to 1e4.  If Drill batch sizes can exceed that we're going to have a problem.  A possible solution is to always partition into conservative insert batches of, say 500 records.  The `PreparedStatement` and `executeBatch` JDBC API usage in this answer https://stackoverflow.com/a/3786127/1153953 might help to keep things as efficient as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730233364



##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
##########
@@ -370,22 +372,73 @@ public void testWithArrayField() throws Exception {
     }
   }
 
+  @Test
+  public void testWithLargeFile() throws Exception {
+    String query = "CREATE TABLE h2.tmp.`drill_h2_test`.`t2` (id,first_name,last_name,email,gender,ip_address) AS " +
+      "SELECT id,first_name,last_name,email,gender,ip_address FROM cp.`csv/large_csv.csvh`";
+    QuerySummary insertResults = queryBuilder().sql(query).run();
+    assertTrue(insertResults.succeeded());
+
+    query = "SELECT COUNT(*) FROM h2.tmp.`drill_h2_test`.`t2`";
+    long rowCount = queryBuilder().sql(query).singletonLong();
+    assertEquals(6000, rowCount);
+
+    // Now drop the table
+    String dropQuery = "DROP TABLE h2.tmp.`drill_h2_test`.`t2`";
+    QuerySummary dropResults = queryBuilder().sql(dropQuery).run();
+    assertTrue(dropResults.succeeded());
+  }
+
+  @Test
+  @Ignore("This is a slow test.  Please run manually.")

Review comment:
       Thanks for the explanation.

##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
##########
@@ -210,6 +216,40 @@ public void testWithLargeFile() throws Exception {
     assertTrue(dropResults.succeeded());
   }
 
+  @Test
+  @Ignore("This is a slow test.  Please run manually.")
+  public void testWithReallyLongFile() throws Exception {
+    Path generatedFile = null;
+    try {
+      generatedFile = JdbcTestUtils.generateCsvFile("csv/very_large_file.csvh", 10, 100000);

Review comment:
       As an option, can we compress the csvh file to tar.gz format then read it ? Just to reduce the size of code base.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730405118



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcCatalogSchema.java
##########
@@ -56,7 +56,7 @@
       while (set.next()) {
         final String catalogName = set.getString(1);
         CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
-            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive);
+            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive, convention.getPlugin());

Review comment:
       Done.  I had to make change `CapitalizingJdbcPlugin` to require a `DrillJdbcConvention` rather than the Calcite `JdbcConvention`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r726032088



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       @dzamo 
   This is a good question.  What is supposed to happen is that inserts actually happen in batches.   Any suggestions as to how to test?  Do you think I should just generate a CSV file with 1M records and see what happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730462584



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)

Review comment:
       Do we support repeated types or lists of lists and so on?

##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>
+      <artifactId>calcite-server</artifactId>

Review comment:
       Could you please explain why this dependency is required here?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()

Review comment:
       Please combine it with a declaration instead of a static block.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try (Statement statement = connection.createStatement()) {
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);

Review comment:
       Please wrap the statement into the try-with-resources block and add a closing connection to the `finally` block, since, for the case of exception during statement execution, it wouldn't be closed...

##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>

Review comment:
       ```suggestion
         <groupId>${calcite.groupId}</groupId>
   ```

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try (Statement statement = connection.createStatement()) {
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString

Review comment:
       Can't we use Calcite's `JdbcTableModify` to create insert statement string instead of using custom logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r732304699



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = "UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org