You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/10/08 11:15:50 UTC

[GitHub] [drill] dzamo commented on a change in pull request #2327: DRILL-8005: Add Writer to JDBC Storage Plugin

dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724060422



##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+  private static final Logger logger = LoggerFactory.getLogger(JdbcQueryBuilder.class);
+  public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+  private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+  private final StringBuilder createTableQuery;
+  private SqlDialect dialect;
+  private StringBuilder columns;
+
+  public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+    if (Strings.isNullOrEmpty(tableName)) {
+      throw new UnsupportedOperationException("Table name cannot be empty");
+    }
+    this.dialect = dialect;
+    createTableQuery = new StringBuilder();
+    createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+    columns = new StringBuilder();
+  }
+
+  /**
+   * Adds a column to the CREATE TABLE statement
+   * @param colName The column to be added to the table
+   * @param type The Drill MinorType of the column
+   * @param nullable If the column is nullable or not.
+   * @param precision The precision, or overall length of a column
+   * @param scale The scale, or number of digits after the decimal
+   */
+  public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
+    StringBuilder queryText = new StringBuilder();
+    String jdbcColType = "";
+    try {
+      jdbcColType = JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+    } catch (NullPointerException e) {
+      // JDBC Does not support writing complex fields to databases
+      throw UserException.dataWriteError()
+        .message("Drill does not support writing complex fields to JDBC data sources.")
+        .addContext(colName + " is a complex type.")
+        .build(logger);
+    }
+
+    queryText.append(colName).append(" ").append(jdbcColType);
+
+    // Add precision or scale if applicable
+    if (jdbcColType.equals("VARCHAR")) {
+      int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+      queryText.append("(").append(max_precision).append(")");
+    }
+
+    if (!nullable) {
+      queryText.append(" NOT NULL");
+    }
+
+    if (! Strings.isNullOrEmpty(columns.toString())) {
+      columns.append(",\n");
+    }
+
+    columns.append(queryText);
+  }
+
+  /**
+   * Generates the CREATE TABLE query.
+   * @return The create table query.
+   */
+  public String getCreateTableQuery() {
+    createTableQuery.append(columns);
+    createTableQuery.append("\n)");
+    return createTableQuery.toString();
+  }
+
+  @Override
+  public String toString() {
+    return getCreateTableQuery();
+  }
+
+  /**
+   * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
+   * @param table The table
+   * @param catalog The database catalog
+   * @param schema The database schema
+   * @return The table with catalog and schema added, if present
+   */
+  public static String buildCompleteTableName(String table, String catalog, String schema) {

Review comment:
       Quite often RDBMSes allow spaces, and possibly other tricky characters, to be used in table names.  Trouble is that they differ in how they want want such identifiers enclosed in those cases e.g. `[spaced out table]` vs ```spaced out table```.  Do we want to raise an error or warning here if e.g. a regexp sees some of these characters?  Otherwise I think later `INSERT` statements could break.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
     return inner.getTableNames();
   }
 
+
+  @Override
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
+    if (! plugin.getConfig().isWritable()) {
+      throw UserException
+        .dataWriteError()
+        .message(plugin.getName() + " is not writable.")
+        .build(logger);
+    }
+
+    return new CreateTableEntry() {
+
+      @Override
+      public Writer getWriter(PhysicalOperator child) throws IOException {
+        String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+        return new JdbcWriter(child, tableWithSchema, inner, plugin);
+      }
+
+      @Override
+      public List<String> getPartitionColumns() {
+        return Collections.emptyList();
+      }
+    };
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+    String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+
+    try {
+      Connection conn = inner.getDataSource().getConnection();

Review comment:
       Do we need a `finally { conn.close() }` for this?

##########
File path: contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * JDBC storage plugin tests against Postgres.
+ */
+@Category(JdbcStorageTest.class)
+public class TestJdbcPluginWithPostgres extends ClusterTest {

Review comment:
       The contents of this class look like they're not writer-related and should have been there before...  was it just something we didn't have?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcCatalogSchema.java
##########
@@ -56,7 +56,7 @@
       while (set.next()) {
         final String catalogName = set.getString(1);
         CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
-            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive);
+            getSchemaPath(), catalogName, source, dialect, convention, catalogName, null, caseSensitive, convention.getPlugin());

Review comment:
       If CapitalizingJdbcSchema can only correctly use the JdbcStoragePlugin returned by `convention.getPlugin()` then I'd consider not adding to the constructor args here, but letting CapitalizingJdbcSchema do the lookup `convention.getPlugin()` itself, in the constructor.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcDDLQueryUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
+  /**
+   * Converts a given SQL query from the generic dialect to the destination system dialect.  Returns
+   * null if the original query is not valid.
+   *
+   * @param query An ANSI SQL statement
+   * @param dialect The destination system dialect
+   * @return A representation of the original query in the destination dialect
+   */
+  public static String cleanDDLQuery(String query, SqlDialect dialect) {
+    SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
+      .setParserFactory(SqlDdlParserImpl.FACTORY)
+      .setConformance(SqlConformanceEnum.MYSQL_5)
+      .setCaseSensitive(true)
+      .setLex(Lex.MYSQL_ANSI)
+      .build();
+
+    try {
+      SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
+      String cleanSQL =  node.toSqlString(dialect).getSql();
+
+      // TODO Fix this hack
+      // HACK  See CALCITE-4820 (https://issues.apache.org/jira/browse/CALCITE-4820)

Review comment:
       Did you see the response from the Calcite team on CALCITE-4820?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {
+    StringBuilder values = new StringBuilder();
+    for (int i = 0; i < insertRows.size(); i++) {
+      if (i > 0) {
+        values.append(",\n");
+      }
+      values.append(insertRows.get(i));
+    }
+
+    String sql = String.format(INSERT_QUERY_TEMPLATE, tableName, values);
+    return JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+  }
+
+  private String formatDateForInsertQuery(Long dateVal) {
+    Date date=new Date(dateVal);
+    SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
+    return df2.format(date);
+  }
+
+  private String formatTimeForInsertQuery(Integer millis) {
+    return String.format("%02d:%02d:%02d", TimeUnit.MILLISECONDS.toHours(millis),
+      TimeUnit.MILLISECONDS.toMinutes(millis) % TimeUnit.HOURS.toMinutes(1),
+      TimeUnit.MILLISECONDS.toSeconds(millis) % TimeUnit.MINUTES.toSeconds(1));
+  }
+
+  private String formatTimeStampForInsertQuery(Long time) {
+    Date date = new Date(time);
+    Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    return format.format(date);
+  }
+
+  @Override
+  public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new NullableIntJDBCConverter(fieldId, fieldName, reader, fields);
+  }
+
+  public class NullableIntJDBCConverter extends FieldConverter {

Review comment:
       I guess maybe we could do something with a Freemarker template for the converters but I'm not convinced it's worth it now that we already have these written.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {

Review comment:
       It feels weird that we do the actual inserting in a method called `cleanup` - is this the right place?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
+            value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       I think that the maximum number of records DBMSes allow in a `VALUES` expression is order 1e3 to 1e4.  If Drill batch sizes can exceed that we're going to have a problem.  A possible solution is to always partition into conservative insert batches of, say 500 records.  The `PreparedStatement` and `executeBatch` JDBC API usage in this answer https://stackoverflow.com/a/3786127/1153953 might help to keep things as efficient as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org