You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/09/14 23:33:22 UTC
[drill] branch master updated: DRILL-8303: Add support for inserts into JDBC storage (#2646)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 975b7d60a6 DRILL-8303: Add support for inserts into JDBC storage (#2646)
975b7d60a6 is described below
commit 975b7d60a6520e1ff2ba17bbae7347739ed2e628
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Sep 15 02:33:14 2022 +0300
DRILL-8303: Add support for inserts into JDBC storage (#2646)
* DRILL-8303: Add support for inserts into JDBC storage
---
.../exec/store/jdbc/CapitalizingJdbcSchema.java | 39 +-
.../drill/exec/store/jdbc/DrillJdbcConvention.java | 41 +-
.../drill/exec/store/jdbc/JdbcBatchReader.java | 73 ++-
.../{JdbcWriter.java => JdbcInsertWriter.java} | 65 +-
...ator.java => JdbcInsertWriterBatchCreator.java} | 19 +-
.../drill/exec/store/jdbc/JdbcRecordWriter.java | 716 ++++-----------------
.../drill/exec/store/jdbc/JdbcStoragePlugin.java | 32 +-
.../exec/store/jdbc/JdbcTableModifyWriter.java | 35 +
.../apache/drill/exec/store/jdbc/JdbcWriter.java | 23 +-
.../exec/store/jdbc/JdbcWriterBatchCreator.java | 3 +-
.../store/jdbc/utils/CreateTableStmtBuilder.java | 164 ++---
.../store/jdbc/utils/InsertStatementBuilder.java | 73 +++
.../exec/store/jdbc/utils/JdbcDDLQueryUtils.java | 114 ----
.../store/jdbc/TestCreateTableStmtBuilder.java | 51 --
.../exec/store/jdbc/TestJdbcInsertWithH2.java | 247 +++++++
.../exec/store/jdbc/TestJdbcInsertWithMySQL.java | 281 ++++++++
.../store/jdbc/TestJdbcInsertWithPostgres.java | 253 ++++++++
.../exec/store/jdbc/TestJdbcWriterWithH2.java | 13 +-
.../exec/store/jdbc/TestJdbcWriterWithMySQL.java | 23 +-
.../store/jdbc/TestJdbcWriterWithPostgres.java | 17 +-
.../org/apache/drill/exec/ops/QueryContext.java | 2 +-
.../physical/base/AbstractPhysicalVisitor.java | 6 +
.../drill/exec/physical/base/PhysicalVisitor.java | 2 +
.../drill/exec/physical/config/TableModify.java | 56 ++
.../physical/impl/InsertWriterRecordBatch.java | 59 ++
.../exec/physical/impl/WriterRecordBatch.java | 30 +-
.../apache/drill/exec/planner/PlannerPhase.java | 6 +-
.../drill/exec/planner/common/DrillRelOptUtil.java | 2 +-
.../generators/NonCoveringIndexPlanGenerator.java | 2 +-
.../exec/planner/logical/DrillTableModify.java | 66 ++
.../exec/planner/logical/DrillTableModifyRule.java | 52 ++
.../exec/planner/logical/ModifyTableEntry.java | 29 +
.../drill/exec/planner/physical/ScreenPrule.java | 4 +-
.../exec/planner/physical/TableModifyPrel.java | 112 ++++
.../{ScreenPrule.java => TableModifyPrule.java} | 34 +-
.../planner/physical/visitor/BasePrelVisitor.java | 6 +
.../exec/planner/physical/visitor/PrelVisitor.java | 2 +
.../physical/visitor/PrelVisualizerVisitor.java | 8 +
.../drill/exec/planner/sql/DrillSqlWorker.java | 21 +-
.../exec/planner/sql/handlers/InsertHandler.java | 61 ++
.../planner/sql/handlers/SqlHandlerConfig.java | 26 +-
.../apache/drill/exec/store/AbstractSchema.java | 13 +
.../drill/exec/store/AbstractStoragePlugin.java | 5 +
.../org/apache/drill/exec/store/StoragePlugin.java | 5 +
.../drill/common/logical/data/InsertWriter.java | 31 +
45 files changed, 1895 insertions(+), 1027 deletions(-)
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 09b09873a9..713653e743 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -18,10 +18,10 @@
package org.apache.drill.exec.store.jdbc;
import javax.sql.DataSource;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -34,14 +34,18 @@ import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Writer;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
+import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StorageStrategy;
-import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
-import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,9 +126,8 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
return new CreateTableEntry() {
@Override
- public Writer getWriter(PhysicalOperator child) throws IOException {
- String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
- return new JdbcWriter(child, tableWithSchema, inner, plugin);
+ public Writer getWriter(PhysicalOperator child) {
+ return new JdbcWriter(child, getFullTablePath(tableName), inner, plugin);
}
@Override
@@ -134,6 +137,11 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
};
}
+ @Override
+ public ModifyTableEntry modifyTable(String tableName) {
+ return child -> new JdbcInsertWriter(child, getFullTablePath(tableName), inner, plugin);
+ }
+
@Override
public void dropTable(String tableName) {
if (! plugin.getConfig().isWritable()) {
@@ -143,10 +151,11 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
.build(logger);
}
- String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
- String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+ List<String> names = getFullTablePath(tableName);
SqlDialect dialect = plugin.getDialect(inner.getDataSource());
- dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, dialect);
+ String dropTableQuery = SqlDropTable.OPERATOR.createCall(
+ SqlParserPos.ZERO, new SqlIdentifier(names, SqlParserPos.ZERO), SqlLiteral.createBoolean(false, SqlParserPos.ZERO))
+ .toSqlString(dialect).getSql();
try (Connection conn = inner.getDataSource().getConnection();
Statement stmt = conn.createStatement()) {
@@ -167,6 +176,18 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
}
}
+ private List<String> getFullTablePath(String tableName) {
+ List<String> names = new ArrayList<>();
+ if (!StringUtils.isEmpty(catalog)) {
+ names.add(catalog);
+ }
+ if (!StringUtils.isEmpty(schema)) {
+ names.add(schema);
+ }
+ names.add(tableName);
+ return names;
+ }
+
@Override
public boolean isMutable() {
return plugin.getConfig().isWritable();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 3639628b2c..09ccb57238 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.jdbc;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
import org.apache.calcite.adapter.jdbc.JdbcConvention;
import org.apache.calcite.adapter.jdbc.JdbcRules;
@@ -31,7 +31,9 @@ import org.apache.calcite.linq4j.tree.ConstantUntypedNull;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.sql.SqlDialect;
import org.apache.drill.exec.planner.RuleInstance;
import org.apache.drill.exec.planner.logical.DrillRel;
@@ -40,6 +42,7 @@ import org.apache.drill.exec.store.enumerable.plan.DrillJdbcRuleBase;
import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
import org.apache.drill.exec.store.jdbc.rules.JdbcLimitRule;
import org.apache.drill.exec.store.jdbc.rules.JdbcSortRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
/**
@@ -55,22 +58,16 @@ public class DrillJdbcConvention extends JdbcConvention {
private final ImmutableSet<RelOptRule> rules;
private final JdbcStoragePlugin plugin;
- private final String username;
DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
super(dialect, ConstantUntypedNull.INSTANCE, name);
this.plugin = plugin;
- this.username = username;
- List<RelOptRule> calciteJdbcRules = JdbcRules.rules(this, DrillRelFactories.LOGICAL_BUILDER).stream()
- .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
- .collect(Collectors.toList());
List<RelTrait> inputTraits = Arrays.asList(
Convention.NONE,
DrillRel.DRILL_LOGICAL);
ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
- .addAll(calciteJdbcRules)
.add(new JdbcIntermediatePrelConverterRule(this, username))
.add(VertexDrelConverterRule.create(this))
.add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
@@ -81,6 +78,9 @@ public class DrillJdbcConvention extends JdbcConvention {
.add(new DrillJdbcRuleBase.DrillJdbcFilterRule(inputTrait, this))
.add(new JdbcSortRule(inputTrait, this))
.add(new JdbcLimitRule(inputTrait, this));
+ rules(inputTrait, this).stream()
+ .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
+ .forEach(builder::add);
}
this.rules = builder.build();
@@ -98,4 +98,31 @@ public class DrillJdbcConvention extends JdbcConvention {
public JdbcStoragePlugin getPlugin() {
return plugin;
}
+
+ private static List<RelOptRule> rules(
+ RelTrait inputTrait, JdbcConvention out) {
+ ImmutableList.Builder<RelOptRule> b = ImmutableList.builder();
+ foreachRule(out, r ->
+ b.add(r.config
+ .as(ConverterRule.Config.class)
+ .withConversion(r.getOperand().getMatchedClass(), inputTrait, out, r.config.description())
+ .withRelBuilderFactory(DrillRelFactories.LOGICAL_BUILDER)
+ .toRule()));
+ return b.build();
+ }
+
+ private static void foreachRule(JdbcConvention out,
+ Consumer<RelRule<?>> consumer) {
+ consumer.accept(JdbcToEnumerableConverterRule.create(out));
+ consumer.accept(JdbcRules.JdbcJoinRule.create(out));
+ consumer.accept(JdbcProjectRule.create(out));
+ consumer.accept(JdbcFilterRule.create(out));
+ consumer.accept(JdbcRules.JdbcAggregateRule.create(out));
+ consumer.accept(JdbcRules.JdbcSortRule.create(out));
+ consumer.accept(JdbcRules.JdbcUnionRule.create(out));
+ consumer.accept(JdbcRules.JdbcIntersectRule.create(out));
+ consumer.accept(JdbcRules.JdbcMinusRule.create(out));
+ consumer.accept(JdbcRules.JdbcTableModificationRule.create(out));
+ consumer.accept(JdbcRules.JdbcValuesRule.create(out));
+ }
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
index 178a929442..30270d6664 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
@@ -67,6 +67,7 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
private Connection connection;
private PreparedStatement statement;
private ResultSet resultSet;
+ private Integer updateCount;
private RowSetLoader rowWriter;
private CustomErrorContext errorContext;
private List<JdbcColumnWriter> columnWriters;
@@ -130,10 +131,15 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
this.errorContext = negotiator.parentErrorContext();
try {
connection = source.getConnection();
+ TupleMetadata drillSchema;
statement = connection.prepareStatement(sql);
- resultSet = statement.executeQuery();
-
- TupleMetadata drillSchema = buildSchema();
+ if (statement.execute()) {
+ resultSet = statement.getResultSet();
+ drillSchema = buildSchema();
+ } else {
+ updateCount = statement.getUpdateCount();
+ drillSchema = buildUpdateQuerySchema();
+ }
negotiator.tableSchema(drillSchema, true);
ResultSetLoader resultSetLoader = negotiator.build();
@@ -164,15 +170,11 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
private boolean processRow() {
try {
- if (!resultSet.next()) {
- return false;
- }
- rowWriter.start();
- // Process results
- for (JdbcColumnWriter writer : columnWriters) {
- writer.load(resultSet);
+ if (resultSet != null) {
+ return processResultSetRow();
+ } else {
+ return processUpdateRow();
}
- rowWriter.save();
} catch (SQLException e) {
throw UserException
.dataReadError(e)
@@ -181,7 +183,29 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
.addContext(errorContext)
.build(logger);
}
+ }
+
+ private boolean processResultSetRow() throws SQLException {
+ if (!resultSet.next()) {
+ return false;
+ }
+ rowWriter.start();
+ // Process results
+ for (JdbcColumnWriter writer : columnWriters) {
+ writer.load(resultSet);
+ }
+ rowWriter.save();
+ return true;
+ }
+ private boolean processUpdateRow() {
+ if (updateCount == null) {
+ return false;
+ }
+ rowWriter.start();
+ rowWriter.scalar(columns.get(0).getRootSegmentPath()).setLong(updateCount);
+ rowWriter.save();
+ updateCount = null;
return true;
}
@@ -245,6 +269,33 @@ public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
return builder.buildSchema();
}
+ private TupleMetadata buildUpdateQuerySchema() throws SQLException {
+ if (columns.size() != 1) {
+ throw UserException
+ .validationError()
+ .message(
+ "Expected columns count differs from the returned one.\n" +
+ "Expected columns: %s\n" +
+ "Returned columns count: %s",
+ columns, 1)
+ .addContext("Sql", sql)
+ .addContext(errorContext)
+ .build(logger);
+ }
+
+ String name = columns.get(0).getRootSegmentPath();
+ int width = DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision();
+ int scale = 0;
+
+ MinorType minorType = MinorType.BIGINT;
+
+ jdbcColumns = new ArrayList<>();
+ jdbcColumns.add(new JdbcColumn(name, minorType, 0, scale, width));
+ return new SchemaBuilder()
+ .addNullable(name, minorType, width, scale)
+ .buildSchema();
+ }
+
private void populateWriterArray() {
columnWriters = new ArrayList<>();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
similarity index 51%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
copy to contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
index 05924a1d80..e691f3b328 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriter.java
@@ -17,47 +17,31 @@
*/
package org.apache.drill.exec.store.jdbc;
-import java.io.IOException;
-
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class JdbcWriter extends AbstractWriter {
-
- public static final String OPERATOR_TYPE = "JDBC_WRITER";
+import java.util.List;
- private final JdbcStoragePlugin plugin;
- private final String tableName;
- private final JdbcSchema inner;
+public class JdbcInsertWriter extends JdbcWriter {
+ public static final String OPERATOR_TYPE = "JDBC_INSERT_WRITER";
@JsonCreator
- public JdbcWriter(
+ public JdbcInsertWriter(
@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("name") String name,
+ @JsonProperty("tableIdentifier") List<String> tableIdentifier,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JacksonInject JdbcSchema inner,
- @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
- super(child);
- this.plugin = engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class);
- this.tableName = name;
- this.inner = inner;
+ @JacksonInject StoragePluginRegistry engineRegistry) {
+ super(child, tableIdentifier, inner, engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class));
}
- JdbcWriter(PhysicalOperator child, String name, JdbcSchema inner, JdbcStoragePlugin plugin) {
- super(child);
- this.tableName = name;
- this.plugin = plugin;
- this.inner = inner;
+ JdbcInsertWriter(PhysicalOperator child, List<String> tableIdentifier, JdbcSchema inner, JdbcStoragePlugin plugin) {
+ super(child, tableIdentifier, inner, plugin);
}
@Override
@@ -67,30 +51,7 @@ public class JdbcWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new JdbcWriter(child, tableName, inner, plugin);
+ return new JdbcInsertWriter(child, getTableIdentifier(), getInner(), getPlugin());
}
- public String getTableName() {
- return tableName;
- }
-
- public StoragePluginConfig getStorage() {
- return plugin.getConfig();
- }
-
- @JsonIgnore
- public JdbcSchema getInner() { return inner; }
-
- @JsonIgnore
- public JdbcStoragePlugin getPlugin() {
- return plugin;
- }
-
- @Override
- public String toString() {
- return new PlanStringBuilder(this)
- .field("tableName", tableName)
- .field("storageStrategy", getStorageStrategy())
- .toString();
- }
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
similarity index 83%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
copy to contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
index 182c104714..1c86e6447d 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
@@ -17,22 +17,23 @@
*/
package org.apache.drill.exec.store.jdbc;
-import java.util.List;
-
-import javax.sql.DataSource;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.physical.impl.InsertWriterRecordBatch;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
+import javax.sql.DataSource;
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class JdbcInsertWriterBatchCreator implements BatchCreator<JdbcInsertWriter> {
@Override
- public CloseableRecordBatch getBatch(ExecutorFragmentContext context, JdbcWriter config, List<RecordBatch> children)
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ JdbcInsertWriter config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
@@ -44,11 +45,11 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
config.getPlugin().getName()
)));
- return new WriterRecordBatch(
+ return new InsertWriterRecordBatch(
config,
children.iterator().next(),
context,
- new JdbcRecordWriter(ds, null, config.getTableName(), config)
+ new JdbcTableModifyWriter(ds, config.getTableIdentifier(), config)
);
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index e403e4f09d..ebe907392a 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -19,62 +19,32 @@
package org.apache.drill.exec.store.jdbc;
import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
-import org.apache.calcite.sql.util.SqlBuilder;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.BitHolder;
-import org.apache.drill.exec.expr.holders.DateHolder;
-import org.apache.drill.exec.expr.holders.Float4Holder;
-import org.apache.drill.exec.expr.holders.Float8Holder;
-import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableDateHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
-import org.apache.drill.exec.expr.holders.NullableTimeHolder;
-import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
-import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
-import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
-import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
-import org.apache.drill.exec.expr.holders.SmallIntHolder;
-import org.apache.drill.exec.expr.holders.TimeHolder;
-import org.apache.drill.exec.expr.holders.TimeStampHolder;
-import org.apache.drill.exec.expr.holders.TinyIntHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.expr.holders.VarDecimalHolder;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.AbstractRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
-import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
-import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.store.jdbc.utils.InsertStatementBuilder;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.io.IOException;
-import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
+import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -83,51 +53,19 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
- private final String tableName;
- private Connection connection;
- private final SqlDialect dialect;
- private final List<Object> rowList;
- private final List<JdbcWriterField> fields;
- private final String rawTableName;
+ private final List<String> tableIdentifier;
+ private final Connection connection;
+ protected final SqlDialect dialect;
+ private final InsertStatementBuilder insertStatementBuilder;
private final JdbcWriter config;
- private SqlBuilder insertQueryBuilder;
- private boolean firstRecord;
private int recordCount;
- /*
- * This map maps JDBC data types to their Drill equivalents. The basic strategy is that if there
- * is a Drill equivalent, then do the mapping as expected.
- *
- * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
- * mapped to VARBINARY.
- */
- public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
- .put(MinorType.FLOAT8, java.sql.Types.DOUBLE)
- .put(MinorType.FLOAT4, java.sql.Types.FLOAT)
- .put(MinorType.TINYINT, java.sql.Types.TINYINT)
- .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
- .put(MinorType.INT, java.sql.Types.INTEGER)
- .put(MinorType.BIGINT, java.sql.Types.BIGINT)
- .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
- .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
- .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
- .put(MinorType.DATE, java.sql.Types.DATE)
- .put(MinorType.TIME, java.sql.Types.TIME)
- .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
- .put(MinorType.BIT, java.sql.Types.BOOLEAN)
- .build();
-
- public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
- this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
- this.rowList = new ArrayList<>();
+ public JdbcRecordWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+ this.tableIdentifier = tableIdentifier;
this.dialect = config.getPlugin().getDialect(source);
this.config = config;
- this.rawTableName = name;
- this.fields = new ArrayList<>();
- this.firstRecord = true;
this.recordCount = 0;
-
- this.insertQueryBuilder = initializeInsertQuery();
+ this.insertStatementBuilder = getInsertStatementBuilder(tableIdentifier);
try {
this.connection = source.getConnection();
@@ -139,6 +77,10 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
}
}
+ protected InsertStatementBuilder getInsertStatementBuilder(List<String> tableIdentifier) {
+ return new InsertStatementBuilder(tableIdentifier, dialect);
+ }
+
@Override
public void init(Map<String, String> writerOptions) {
// Nothing to see here...
@@ -147,35 +89,20 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
@Override
public void updateSchema(VectorAccessible batch) {
BatchSchema schema = batch.getSchema();
- String columnName;
- MinorType type;
- String sql;
- boolean nullable = false;
- CreateTableStmtBuilder queryBuilder = new CreateTableStmtBuilder(tableName, dialect);
-
+ CreateTableStmtBuilder queryBuilder = new CreateTableStmtBuilder(tableIdentifier, dialect);
for (MaterializedField field : schema) {
- columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
- type = field.getType().getMinorType();
- logger.debug("Adding column {} of type {}.", columnName, type);
+ logger.debug("Adding column {} of type {}.", field.getName(), field.getType().getMinorType());
if (field.getType().getMode() == DataMode.REPEATED) {
throw UserException.dataWriteError()
- .message("Drill does not yet support writing arrays to JDBC. " + columnName + " is an array.")
+ .message("Drill does not yet support writing arrays to JDBC. " + field.getName() + " is an array.")
.build(logger);
}
- if (field.getType().getMode() == DataMode.OPTIONAL) {
- nullable = true;
- }
-
- int precision = field.getPrecision();
- int scale = field.getScale();
-
- queryBuilder.addColumn(columnName, field.getType().getMinorType(), nullable, precision, scale);
+ queryBuilder.addColumn(field);
}
- sql = queryBuilder.build().getCreateTableQuery();
- sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+ String sql = queryBuilder.build();
logger.debug("Final query: {}", sql);
// Execute the query to build the schema
@@ -192,83 +119,26 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
@Override
public void startRecord() {
- rowList.clear();
+ insertStatementBuilder.resetRow();
- if (!firstRecord) {
- insertQueryBuilder.append(",");
- }
- insertQueryBuilder.append("(");
logger.debug("Start record");
}
@Override
public void endRecord() throws IOException {
logger.debug("Ending record");
-
- // Add values to rowString
- for (int i = 0; i < rowList.size(); i++) {
- if (i > 0) {
- insertQueryBuilder.append(",");
- }
-
- // Add null value to rowstring
- if (rowList.get(i) instanceof String && ((String) rowList.get(i)).equalsIgnoreCase("null")) {
- insertQueryBuilder.append("null");
- continue;
- }
-
- JdbcWriterField currentField = fields.get(i);
- if (currentField.getDataType() == MinorType.VARCHAR) {
- String value = null;
- // Get the string value
- if (currentField.getMode() == DataMode.REQUIRED) {
- VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
- value = StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
- } else {
- try {
- NullableVarCharHolder nullableVarCharHolder = (NullableVarCharHolder) rowList.get(i);
- value = StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
- } catch (ClassCastException e) {
- logger.error("Unable to read field: {}", rowList.get(i));
- }
- }
-
- // Add to value string
- insertQueryBuilder.literal(value);
- } else if (currentField.getDataType() == MinorType.DATE) {
- String dateString = formatDateForInsertQuery((Long) rowList.get(i));
- insertQueryBuilder.literal(dateString);
- } else if (currentField.getDataType() == MinorType.TIME) {
- String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
- insertQueryBuilder.literal(timeString);
- } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
- String timeString = formatTimeStampForInsertQuery((Long) rowList.get(i));
- insertQueryBuilder.literal(timeString);
- } else {
- if (Strings.isNullOrEmpty(rowList.get(i).toString())) {
- insertQueryBuilder.append("null");
- } else {
- insertQueryBuilder.append(rowList.get(i).toString());
- }
- }
- }
+ insertStatementBuilder.endRecord();
recordCount++;
- firstRecord = false;
- insertQueryBuilder.append(")");
if (recordCount >= config.getPlugin().getConfig().getWriterBatchSize()) {
- // Execute the insert query
- String insertQuery = insertQueryBuilder.toString();
- executeInsert(insertQuery);
+ executeInsert(insertStatementBuilder.buildInsertQuery());
// Reset the batch
recordCount = 0;
- firstRecord = true;
- insertQueryBuilder = initializeInsertQuery();
}
- rowList.clear();
+ insertStatementBuilder.resetRow();
}
@Override
@@ -279,10 +149,8 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
@Override
public void cleanup() throws IOException {
logger.debug("Cleanup record");
- // Execute last query
- String insertQuery = insertQueryBuilder.toString();
if (recordCount != 0) {
- executeInsert(insertQuery);
+ executeInsert(insertStatementBuilder.buildInsertQuery());
}
AutoCloseables.closeSilently(connection);
}
@@ -301,600 +169,254 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
}
}
- private SqlBuilder initializeInsertQuery() {
- SqlBuilder builder = new SqlBuilder(this.dialect);
-
- // Apache Phoenix does not support INSERT but does support UPSERT using the same syntax
- if (dialect == DatabaseProduct.PHOENIX.getDialect()) {
- builder.append("UPSERT INTO ");
- } else {
- builder.append("INSERT INTO ");
- }
-
- JdbcDDLQueryUtils.addTableToInsertQuery(builder, rawTableName);
- builder.append (" VALUES ");
- return builder;
- }
-
- /**
- * Drill returns longs for date values. This function converts longs into dates formatted
- * in YYYY-MM-dd format for insertion into a database.
- * @param dateVal long representing a naive date
- * @return A date string formatted YYYY-MM-dd
- */
- private String formatDateForInsertQuery(Long dateVal) {
- Date date=new Date(dateVal);
- SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
- return df2.format(date);
- }
-
- /**
- * Drill returns longs for time values. This function converts longs into times formatted
- * in HH:mm:ss format for insertion into a database.
- * @param millis Milliseconds since the epoch.
- * @return A time string formatted for insertion into a database.
- */
- private String formatTimeForInsertQuery(Integer millis) {
- return String.format("%02d:%02d:%02d", TimeUnit.MILLISECONDS.toHours(millis),
- TimeUnit.MILLISECONDS.toMinutes(millis) % TimeUnit.HOURS.toMinutes(1),
- TimeUnit.MILLISECONDS.toSeconds(millis) % TimeUnit.MINUTES.toSeconds(1));
- }
-
- /**
- * Drill returns longs for date times. This function converts
- * @param time An input long that represents a timestamp
- * @return A ISO formatted timestamp.
- */
- private String formatTimeStampForInsertQuery(Long time) {
- Date date = new Date(time);
- Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- return format.format(date);
- }
+ public class NullableJdbcConverter extends FieldConverter {
+ private final FieldConverter delegate;
- @Override
- public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableIntJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableIntJDBCConverter extends FieldConverter {
- private final NullableIntHolder holder = new NullableIntHolder();
-
- public NullableIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.INT, DataMode.OPTIONAL));
+ public NullableJdbcConverter(int fieldId, String fieldName, FieldReader reader, FieldConverter delegate) {
+ super(fieldId, fieldName, reader);
+ this.delegate = delegate;
}
@Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
+ public void writeField() throws IOException {
+ if (reader.isSet()) {
+ delegate.writeField();
+ } else {
+ insertStatementBuilder.addRowValue(SqlLiteral.createNull(SqlParserPos.ZERO));
}
- reader.read(holder);
- rowList.add(holder.value);
}
}
- @Override
- public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new IntJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class IntJDBCConverter extends FieldConverter {
- private final IntHolder holder = new IntHolder();
+ public class ExactNumericJdbcConverter extends FieldConverter {
- public IntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.INT, DataMode.REQUIRED));
+ public ExactNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
}
@Override
public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createExactNumeric(String.valueOf(reader.readObject()),
+ SqlParserPos.ZERO));
}
}
- @Override
- public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableBigIntJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableBigIntJDBCConverter extends FieldConverter {
- private final NullableBigIntHolder holder = new NullableBigIntHolder();
+ public class ApproxNumericJdbcConverter extends FieldConverter {
- public NullableBigIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.BIGINT, DataMode.OPTIONAL));
+ public ApproxNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
}
@Override
public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createApproxNumeric(String.valueOf(reader.readObject()),
+ SqlParserPos.ZERO));
}
}
@Override
- public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new BigIntJDBCConverter(fieldId, fieldName, reader, fields);
+ public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
- public class BigIntJDBCConverter extends FieldConverter {
- private final BigIntHolder holder = new BigIntHolder();
-
- public BigIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.BIGINT, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ @Override
+ public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
- public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableSmallIntJDBCConverter(fieldId, fieldName, reader, fields);
+ public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
- public class NullableSmallIntJDBCConverter extends FieldConverter {
- private final NullableSmallIntHolder holder = new NullableSmallIntHolder();
-
- public NullableSmallIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.SMALLINT, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ @Override
+ public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
- public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new SmallIntJDBCConverter(fieldId, fieldName, reader, fields);
+ public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
- public class SmallIntJDBCConverter extends FieldConverter {
- private final SmallIntHolder holder = new SmallIntHolder();
-
- public SmallIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.SMALLINT, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
- }
+ @Override
+ public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableTinyIntJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableTinyIntJDBCConverter extends FieldConverter {
- private final NullableTinyIntHolder holder = new NullableTinyIntHolder();
-
- public NullableTinyIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TINYINT, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
- return new TinyIntJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class TinyIntJDBCConverter extends FieldConverter {
- private final TinyIntHolder holder = new TinyIntHolder();
-
- public TinyIntJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TINYINT, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableFloat4JDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableFloat4JDBCConverter extends FieldConverter {
- private final NullableFloat4Holder holder = new NullableFloat4Holder();
-
- public NullableFloat4JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT4, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
- return new Float4JDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class Float4JDBCConverter extends FieldConverter {
- private final Float4Holder holder = new Float4Holder();
-
- public Float4JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT4, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableFloat8JDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableFloat8JDBCConverter extends FieldConverter {
- private final NullableFloat8Holder holder = new NullableFloat8Holder();
-
- public NullableFloat8JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT8, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
- return new Float8JDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class Float8JDBCConverter extends FieldConverter {
- private final Float8Holder holder = new Float8Holder();
-
- public Float8JDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.FLOAT8, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableVardecimalJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableVardecimalJDBCConverter extends FieldConverter {
- private final NullableVarDecimalHolder holder = new NullableVarDecimalHolder();
-
- public NullableVardecimalJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.VARDECIMAL, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
- holder.start, holder.end - holder.start, holder.scale);
- rowList.add(value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
- return new VardecimalJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class VardecimalJDBCConverter extends FieldConverter {
- private final VarDecimalHolder holder = new VarDecimalHolder();
-
- public VardecimalJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.VARDECIMAL, DataMode.REQUIRED));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- BigDecimal value = DecimalUtility.getBigDecimalFromDrillBuf(holder.buffer,
- holder.start, holder.end - holder.start, holder.scale);
- rowList.add(value);
- }
+ return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableVarCharJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableVarCharJDBCConverter extends FieldConverter {
- private final NullableVarCharHolder holder = new NullableVarCharHolder();
-
- public NullableVarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.VARCHAR, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- reader.read(holder);
- if (reader.isSet()) {
- byte[] bytes = new byte[holder.end - holder.start];
- holder.buffer.getBytes(holder.start, bytes);
- }
- rowList.add(holder);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new VarCharJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
- return new VarCharJDBCConverter(fieldId, fieldName, reader, fields);
+ return new VarCharJDBCConverter(fieldId, fieldName, reader);
}
public class VarCharJDBCConverter extends FieldConverter {
- private final VarCharHolder holder = new VarCharHolder();
- public VarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+ public VarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.VARCHAR, DataMode.REQUIRED));
}
@Override
public void writeField() {
- reader.read(holder);
- if (reader.isSet()) {
- byte[] bytes = new byte[holder.end - holder.start];
- holder.buffer.getBytes(holder.start, bytes);
- rowList.add(holder);
- }
+ byte[] bytes = reader.readText().copyBytes();
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createCharString(new String(bytes),
+ SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableDateConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableDateJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableDateJDBCConverter extends FieldConverter {
- private final NullableDateHolder holder = new NullableDateHolder();
-
- public NullableDateJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.DATE, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new DateJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
- return new DateJDBCConverter(fieldId, fieldName, reader, fields);
+ return new DateJDBCConverter(fieldId, fieldName, reader);
}
public class DateJDBCConverter extends FieldConverter {
- private final DateHolder holder = new DateHolder();
- public DateJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+ public DateJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.DATE, DataMode.REQUIRED));
}
@Override
public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createDate(DateString.fromDaysSinceEpoch((int) reader.readLocalDate().toEpochDay()),
+ SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableTimeConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableTimeJDBCConverter(fieldId, fieldName, reader, fields);
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new TimeJDBCConverter(fieldId, fieldName, reader));
}
- public class NullableTimeJDBCConverter extends FieldConverter {
- private final NullableTimeHolder holder = new NullableTimeHolder();
-
- public NullableTimeJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TIME, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
- }
-
@Override
public FieldConverter getNewTimeConverter(int fieldId, String fieldName, FieldReader reader) {
- return new TimeJDBCConverter(fieldId, fieldName, reader, fields);
+ return new TimeJDBCConverter(fieldId, fieldName, reader);
}
public class TimeJDBCConverter extends FieldConverter {
- private final TimeHolder holder = new TimeHolder();
- public TimeJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+ public TimeJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TIME, DataMode.REQUIRED));
}
@Override
public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createTime(TimeString.fromMillisOfDay(
+ (int) (reader.readLocalTime().toNanoOfDay() / TimeUnit.MILLISECONDS.toNanos(1))),
+ Types.DEFAULT_TIMESTAMP_PRECISION,
+ SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableTimeStampJDBCConverter(fieldId, fieldName, reader, fields);
- }
-
- public class NullableTimeStampJDBCConverter extends FieldConverter {
- private final NullableTimeStampHolder holder = new NullableTimeStampHolder();
-
- public NullableTimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TIMESTAMP, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- rowList.add(holder.value);
- }
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new TimeStampJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
- return new TimeStampJDBCConverter(fieldId, fieldName, reader, fields);
+ return new TimeStampJDBCConverter(fieldId, fieldName, reader);
}
public class TimeStampJDBCConverter extends FieldConverter {
- private final TimeStampHolder holder = new TimeStampHolder();
- public TimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+ public TimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.TIMESTAMP, DataMode.REQUIRED));
}
@Override
public void writeField() {
- reader.read(holder);
- rowList.add(holder.value);
+ insertStatementBuilder.addRowValue(
+ SqlLiteral.createTimestamp(
+ TimestampString.fromMillisSinceEpoch(reader.readLocalDateTime().toInstant(ZoneOffset.UTC).toEpochMilli()),
+ Types.DEFAULT_TIMESTAMP_PRECISION,
+ SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableBitConverter(int fieldId, String fieldName, FieldReader reader) {
- return new NullableBitJDBCConverter(fieldId, fieldName, reader, fields);
+ return new NullableJdbcConverter(fieldId, fieldName, reader,
+ new BitJDBCConverter(fieldId, fieldName, reader));
}
- public class NullableBitJDBCConverter extends FieldConverter {
- private final NullableBitHolder holder = new NullableBitHolder();
-
- public NullableBitJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
- super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.BIT, DataMode.OPTIONAL));
- }
-
- @Override
- public void writeField() {
- if (!reader.isSet()) {
- rowList.add("null");
- return;
- }
- reader.read(holder);
- String booleanValue = "false";
- if (holder.value == 1) {
- booleanValue = "true";
- }
- rowList.add(booleanValue);
- }
- }
@Override
public FieldConverter getNewBitConverter(int fieldId, String fieldName, FieldReader reader) {
- return new BitJDBCConverter(fieldId, fieldName, reader, fields);
+ return new BitJDBCConverter(fieldId, fieldName, reader);
}
public class BitJDBCConverter extends FieldConverter {
- private final BitHolder holder = new BitHolder();
- public BitJDBCConverter(int fieldID, String fieldName, FieldReader reader, List<JdbcWriterField> fields) {
+ public BitJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
- fields.add(new JdbcWriterField(fieldName, MinorType.BIT, DataMode.REQUIRED));
}
@Override
public void writeField() {
- reader.read(holder);
- String booleanValue = "false";
- if (holder.value == 1) {
- booleanValue = "true";
- }
- rowList.add(booleanValue);
+ insertStatementBuilder.addRowValue(SqlLiteral.createBoolean(reader.readBoolean(), SqlParserPos.ZERO));
}
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 64d89ecb79..8489087465 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -92,7 +93,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
getName(),
userCredentials.getUserName()
);
- return Optional.<DataSource>empty();
+ return Optional.empty();
}
// Missing creds is valid under SHARED_USER (e.g. unsecured DBs, BigQuery's OAuth)
@@ -138,15 +139,30 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext context) {
- UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
- Optional<DataSource> dataSource = getDataSource(userCreds);
+ public boolean supportsInsert() {
+ return jdbcStorageConfig.isWritable();
+ }
- if (!dataSource.isPresent()) {
- return ImmutableSet.of();
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(
+ OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case LOGICAL:
+ case PHYSICAL: {
+ UserCredentials userCreds = optimizerContext.getContextInformation().getQueryUserCredentials();
+
+ String userName = userCreds.getUserName();
+ return getDataSource(userCreds)
+ .map(dataSource -> getConvention(getDialect(dataSource), userName).getRules())
+ .orElse(ImmutableSet.of());
+ }
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return ImmutableSet.of();
}
-
- return getConvention( getDialect(dataSource.get()), userCreds.getUserName() ).getRules();
}
@Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
new file mode 100644
index 0000000000..3895a087cd
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.exec.record.VectorAccessible;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+public class JdbcTableModifyWriter extends JdbcRecordWriter {
+
+ public JdbcTableModifyWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+ super(source, tableIdentifier, config);
+ }
+
+ @Override
+ public void updateSchema(VectorAccessible batch) {
+ // no-op
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
index 05924a1d80..dadd85086a 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriter.java
@@ -17,11 +17,10 @@
*/
package org.apache.drill.exec.store.jdbc;
-import java.io.IOException;
+import java.util.List;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -37,25 +36,25 @@ public class JdbcWriter extends AbstractWriter {
public static final String OPERATOR_TYPE = "JDBC_WRITER";
private final JdbcStoragePlugin plugin;
- private final String tableName;
+ private final List<String> tableIdentifier;
private final JdbcSchema inner;
@JsonCreator
public JdbcWriter(
@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("name") String name,
+ @JsonProperty("tableIdentifier") List<String> tableIdentifier,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JacksonInject JdbcSchema inner,
- @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+ @JacksonInject StoragePluginRegistry engineRegistry) {
super(child);
this.plugin = engineRegistry.resolve(storageConfig, JdbcStoragePlugin.class);
- this.tableName = name;
+ this.tableIdentifier = tableIdentifier;
this.inner = inner;
}
- JdbcWriter(PhysicalOperator child, String name, JdbcSchema inner, JdbcStoragePlugin plugin) {
+ JdbcWriter(PhysicalOperator child, List<String> tableIdentifier, JdbcSchema inner, JdbcStoragePlugin plugin) {
super(child);
- this.tableName = name;
+ this.tableIdentifier = tableIdentifier;
this.plugin = plugin;
this.inner = inner;
}
@@ -67,11 +66,11 @@ public class JdbcWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new JdbcWriter(child, tableName, inner, plugin);
+ return new JdbcWriter(child, tableIdentifier, inner, plugin);
}
- public String getTableName() {
- return tableName;
+ public List<String> getTableIdentifier() {
+ return tableIdentifier;
}
public StoragePluginConfig getStorage() {
@@ -89,7 +88,7 @@ public class JdbcWriter extends AbstractWriter {
@Override
public String toString() {
return new PlanStringBuilder(this)
- .field("tableName", tableName)
+ .field("tableName", tableIdentifier)
.field("storageStrategy", getStorageStrategy())
.toString();
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 182c104714..4373ba6303 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
+@SuppressWarnings("unused")
public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
@Override
@@ -48,7 +49,7 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
config,
children.iterator().next(),
context,
- new JdbcRecordWriter(ds, null, config.getTableName(), config)
+ new JdbcRecordWriter(ds, config.getTableIdentifier(), config)
);
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
index f05ded75f0..015bd7aff6 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/CreateTableStmtBuilder.java
@@ -18,126 +18,130 @@
package org.apache.drill.exec.store.jdbc.utils;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.sql.SqlBasicTypeNameSpec;
+import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.ddl.SqlDdlNodes;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
-import org.apache.parquet.Strings;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.JDBCType;
+import java.util.List;
public class CreateTableStmtBuilder {
private static final Logger logger = LoggerFactory.getLogger(CreateTableStmtBuilder.class);
public static final int DEFAULT_VARCHAR_PRECISION = 100;
- private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
- private StringBuilder createTableQuery;
- private final String tableName;
+ private final List<String> tableIdentifier;
private final SqlDialect dialect;
- private StringBuilder columns;
+ private final SqlNodeList sqlColumns = new SqlNodeList(SqlParserPos.ZERO);
- public CreateTableStmtBuilder(String tableName, SqlDialect dialect) {
- if (Strings.isNullOrEmpty(tableName)) {
+ public CreateTableStmtBuilder(List<String> tableIdentifier, SqlDialect dialect) {
+ if (CollectionUtils.isEmpty(tableIdentifier)) {
throw new UnsupportedOperationException("Table name cannot be empty");
}
- this.tableName = tableName;
+ this.tableIdentifier = tableIdentifier;
this.dialect = dialect;
- columns = new StringBuilder();
}
/**
* Adds a column to the CREATE TABLE statement
- * @param colName The column to be added to the table
- * @param type The Drill MinorType of the column
- * @param nullable If the column is nullable or not.
- * @param precision The precision, or overall length of a column
- * @param scale The scale, or number of digits after the decimal
*/
- public void addColumn(String colName, MinorType type, boolean nullable, int precision, int scale) {
- StringBuilder queryText = new StringBuilder();
- String jdbcColTypeName = "";
- try {
- Integer jdbcColType = JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type);
- jdbcColTypeName = JDBCType.valueOf(jdbcColType).getName();
-
- if (dialect instanceof PostgresqlSqlDialect) {
- // pg data type name special case
- if (jdbcColType.equals(java.sql.Types.DOUBLE)) {
- // TODO: Calcite will incorrectly output DOUBLE instead of DOUBLE PRECISION under the pg dialect
- jdbcColTypeName = "FLOAT";
- }
+ public void addColumn(MaterializedField field) {
+ TypeProtos.MajorType majorType = populateScaleAndPrecisionIfRequired(field.getType());
+ int jdbcType = Types.getJdbcTypeCode(Types.getSqlTypeName(majorType));
+ if (dialect instanceof PostgresqlSqlDialect) {
+ // pg data type name special case
+ if (jdbcType == java.sql.Types.DOUBLE) {
+ // TODO: Calcite will incorrectly output DOUBLE instead of DOUBLE PRECISION under the pg dialect
+ jdbcType = java.sql.Types.FLOAT;
}
- } catch (NullPointerException e) {
- // JDBC Does not support writing complex fields to databases
+ }
+ SqlTypeName sqlTypeName = SqlTypeName.getNameForJdbcType(jdbcType);
+
+ if (sqlTypeName == null) {
throw UserException.dataWriteError()
.message("Drill does not support writing complex fields to JDBC data sources.")
- .addContext(colName + " is a complex type.")
+ .addContext(field.getName() + " is a complex type.")
.build(logger);
}
- queryText.append(colName).append(" ").append(jdbcColTypeName);
+ int precision = majorType.hasPrecision()
+ ? majorType.getPrecision()
+ : -1;
- // Add precision or scale if applicable
- if (jdbcColTypeName.equals("VARCHAR")) {
- int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
- queryText.append("(").append(max_precision).append(")");
- }
+ int scale = majorType.hasScale()
+ ? majorType.getScale()
+ : -1;
- if (!nullable) {
- queryText.append(" NOT NULL");
- }
+ SqlBasicTypeNameSpec typeNameSpec = new SqlBasicTypeNameSpec(
+ sqlTypeName, precision, scale, SqlParserPos.ZERO);
- if (! Strings.isNullOrEmpty(columns.toString())) {
- columns.append(",\n");
- }
+ SqlDataTypeSpec sqlDataTypeSpec = new SqlDataTypeSpec(
+ typeNameSpec,
+ SqlParserPos.ZERO).withNullable(field.isNullable());
+
+ ColumnStrategy columnStrategy = field.isNullable()
+ ? ColumnStrategy.NULLABLE
+ : ColumnStrategy.NOT_NULLABLE;
+
+ SqlNode sqlColumnDeclaration = SqlDdlNodes.column(SqlParserPos.ZERO,
+ new SqlIdentifier(field.getName(), SqlParserPos.ZERO),
+ sqlDataTypeSpec,
+ null,
+ columnStrategy);
- columns.append(queryText);
+ sqlColumns.add(sqlColumnDeclaration);
}
/**
* Generates the CREATE TABLE query.
* @return The create table query.
*/
- public CreateTableStmtBuilder build() {
- createTableQuery = new StringBuilder();
- createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
- createTableQuery.append(columns);
- createTableQuery.append("\n)");
- return this;
- }
+ public String build() {
+ SqlIdentifier sqlIdentifier = new SqlIdentifier(tableIdentifier, SqlParserPos.ZERO);
+ SqlNode createTable = SqlDdlNodes.createTable(SqlParserPos.ZERO,
+ false, false, sqlIdentifier, sqlColumns, null);
- public String getCreateTableQuery() {
- return createTableQuery != null ? createTableQuery.toString() : null;
+ return createTable.toSqlString(dialect, true).getSql();
}
- @Override
- public String toString() {
- return getCreateTableQuery();
- }
-
- /**
- * This function adds the appropriate catalog, schema and table for the FROM clauses for INSERT queries
- * @param table The table
- * @param catalog The database catalog
- * @param schema The database schema
- * @return The table with catalog and schema added, if present
- */
- public static String buildCompleteTableName(String table, String catalog, String schema) {
- logger.debug("Building complete table.");
- StringBuilder completeTable = new StringBuilder();
- if (! Strings.isNullOrEmpty(catalog)) {
- completeTable.append(catalog);
- completeTable.append(".");
- }
-
- if (! Strings.isNullOrEmpty(schema)) {
- completeTable.append(schema);
- completeTable.append(".");
+ private static TypeProtos.MajorType populateScaleAndPrecisionIfRequired(TypeProtos.MajorType type) {
+ switch (type.getMinorType()) {
+ case VARDECIMAL:
+ if (!type.hasPrecision()) {
+ type = type.toBuilder().setPrecision(Types.maxPrecision(type.getMinorType())).build();
+ }
+ if (!type.hasScale()) {
+ type = type.toBuilder().setScale(0).build();
+ }
+ break;
+ case FIXEDCHAR:
+ case FIXED16CHAR:
+ case VARCHAR:
+ case VAR16CHAR:
+ case VARBINARY:
+ if (!type.hasPrecision()) {
+ type = type.toBuilder().setPrecision(DEFAULT_VARCHAR_PRECISION).build();
+ }
+ case TIMESTAMP:
+ case TIME:
+ if (!type.hasPrecision()) {
+ type = type.toBuilder().setPrecision(Types.DEFAULT_TIMESTAMP_PRECISION).build();
+ }
+ default:
}
- completeTable.append(table);
- return JdbcDDLQueryUtils.addBackTicksToTable(completeTable.toString());
+ return type;
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java
new file mode 100644
index 0000000000..63be096fa7
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/InsertStatementBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlInternalOperators;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InsertStatementBuilder {
+
+ private final List<SqlNode> sqlRows = new ArrayList<>();
+ private final List<SqlNode> sqlRowValues = new ArrayList<>();
+ private final SqlDialect dialect;
+ private final List<String> tableIdentifier;
+
+ public InsertStatementBuilder(List<String> tableIdentifier, SqlDialect dialect) {
+ this.dialect = dialect;
+ this.tableIdentifier = tableIdentifier;
+ }
+
+ public void addRowValue(SqlNode value) {
+ sqlRowValues.add(value);
+ }
+
+ public void endRecord() {
+ sqlRows.add(SqlInternalOperators.ANONYMOUS_ROW.createCall(
+ SqlParserPos.ZERO, sqlRowValues.toArray(new SqlNode[0])));
+ resetRow();
+ }
+
+ public void resetRow() {
+ sqlRowValues.clear();
+ }
+
+ public String buildInsertQuery() {
+ SqlCall values = SqlStdOperatorTable.VALUES.createCall(
+ SqlParserPos.ZERO, sqlRows.toArray(new SqlNode[0]));
+ resetRow();
+ sqlRows.clear();
+ SqlInsert sqlInsert = new SqlInsert(
+ SqlParserPos.ZERO,
+ SqlNodeList.EMPTY,
+ new SqlIdentifier(tableIdentifier, SqlParserPos.ZERO),
+ values,
+ null);
+
+ return sqlInsert.toSqlString(dialect).getSql();
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
deleted file mode 100644
index 95b927c6b5..0000000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.jdbc.utils;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.SqlBuilder;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class JdbcDDLQueryUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
- /**
- * Converts a given SQL query from the generic dialect to the destination system dialect. Returns
- * null if the original query is not valid. This should only be used for CTAS queries.
- *
- * @param query An ANSI SQL statement (CTAS Only)
- * @param dialect The destination system dialect
- * @return A representation of the original query in the destination dialect
- */
- public static String cleanDDLQuery(String query, SqlDialect dialect) {
- SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
- .setParserFactory(SqlDdlParserImpl.FACTORY)
- .setConformance(SqlConformanceEnum.STRICT_2003)
- .setCaseSensitive(true)
- .setLex(Lex.MYSQL)
- .build();
-
- try {
- SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
-
- return node.toSqlString(dialect).getSql();
- } catch (SqlParseException e) {
- logger.error(e.getMessage());
- return null;
- }
- }
-
- /**
- * This function adds backticks around table names. If the table name already has backticks,
- * it does nothing.
- * @param inputTable The table name with or without backticks
- * @return The table name with backticks added.
- */
- public static String addBackTicksToTable(String inputTable) {
- String[] queryParts = inputTable.split("\\.");
- StringBuilder cleanQuery = new StringBuilder();
-
- int counter = 0;
- for (String part : queryParts) {
- if (counter > 0) {
- cleanQuery.append(".");
- }
-
- if (part.startsWith("`") && part.endsWith("`")) {
- cleanQuery.append(part);
- } else {
- cleanQuery.append("`").append(part).append("`");
- }
- counter++;
- }
-
- return cleanQuery.toString();
- }
-
- public static void addTableToInsertQuery(SqlBuilder builder, String rawEscapedTables) {
- final String TABLE_REGEX = "(?:`(.+?)`)+";
- Pattern pattern = Pattern.compile(TABLE_REGEX);
- Matcher matcher = pattern.matcher(rawEscapedTables);
-
- int matchCount = 0;
- while (matcher.find()) {
- if (matchCount > 0) {
- builder.append(".");
- }
- builder.identifier(matcher.group(1));
- matchCount++;
- }
- }
-
- public static String addBackTicksToField(String field) {
- if (field.startsWith("`") && field.endsWith("`")) {
- return field;
- } else {
- StringBuilder cleanField = new StringBuilder();
- return cleanField.append("`").append(field).append("`").toString();
- }
- }
-}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java
deleted file mode 100644
index 9d827a642a..0000000000
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestCreateTableStmtBuilder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.jdbc;
-
-import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestCreateTableStmtBuilder {
-
- @Test
- public void testSimpleTable() {
- String table = "table";
- String schema = "schema";
- String catalog = "catalog";
-
- String completeTable = CreateTableStmtBuilder.buildCompleteTableName(table, catalog, schema);
- assertEquals("`catalog`.`schema`.`table`", completeTable);
- assertEquals("`catalog`.`table`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, ""));
- assertEquals("`catalog`.`table`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, null));
- }
-
- @Test
- public void testTablesWithSpaces() {
- String table = "table with spaces";
- String schema = "schema with spaces";
- String catalog = "catalog with spaces";
-
- String completeTable = CreateTableStmtBuilder.buildCompleteTableName(table, catalog, schema);
- assertEquals("`catalog with spaces`.`schema with spaces`.`table with spaces`", completeTable);
- assertEquals("`catalog with spaces`.`table with spaces`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, ""));
- assertEquals("`catalog with spaces`.`table with spaces`", CreateTableStmtBuilder.buildCompleteTableName(table, catalog, null));
- }
-}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java
new file mode 100644
index 0000000000..ab0276d2ac
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithH2.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.enumerable.plan.EnumMockPlugin;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.h2.tools.RunScript;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.FileReader;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithH2 extends ClusterTest {
+
+ @BeforeClass
+ public static void init() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ // Force timezone to UTC for these tests.
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+ Class.forName("org.h2.Driver");
+ String connString = "jdbc:h2:" + dirTestWatcher.getTmpDir().getCanonicalPath();
+ URL scriptFile = TestJdbcPluginWithH2IT.class.getClassLoader().getResource("h2-test-data.sql");
+ assertNotNull("Script for test tables generation 'h2-test-data.sql' cannot be found in test resources", scriptFile);
+ try (Connection connection = DriverManager.getConnection(connString, "root", "root");
+ FileReader fileReader = new FileReader(scriptFile.getFile())) {
+ RunScript.execute(connection, fileReader);
+ }
+
+ Map<String, String> credentials = new HashMap<>();
+ credentials.put("username", "root");
+ credentials.put("password", "root");
+ PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+ Map<String, Object> sourceParameters = new HashMap<>();
+ sourceParameters.put("minimumIdle", 1);
+ JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
+ "root", "root", true, true, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
+ jdbcStorageConfig.setEnabled(true);
+
+ JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("org.h2.Driver", connString,
+ "root", "root", true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
+ jdbcStorageConfig.setEnabled(true);
+ jdbcStorageConfigNoWrite.setEnabled(true);
+
+ cluster.defineStoragePlugin("h2", jdbcStorageConfig);
+ cluster.defineStoragePlugin("h2_unwritable", jdbcStorageConfigNoWrite);
+
+ EnumMockPlugin.EnumMockStoragePluginConfig config = new EnumMockPlugin.EnumMockStoragePluginConfig();
+ config.setEnabled(true);
+ cluster.defineStoragePlugin("mocked_enum", config);
+ }
+
+ @Test
+ public void testInsertValues() throws Exception {
+ String tableName = "h2.tmp.drill_h2_test.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(1L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectValues() throws Exception {
+ String tableName = "h2.tmp.drill_h2_test.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromJdbcTable() throws Exception {
+ String tableName = "h2.tmp.drill_h2_test.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromNonJdbcTable() throws Exception {
+ String tableName = "h2.tmp.drill_h2_test.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(3L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(0, 0)
+ .baselineValues(1, 1)
+ .baselineValues(2, 1)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java
new file mode 100644
index 0000000000..dadcf0b5bb
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithMySQL.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.ext.ScriptUtils;
+import org.testcontainers.jdbc.JdbcDatabaseDelegate;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithMySQL extends ClusterTest {
+ private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
+ private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
+ private static final Logger logger = LoggerFactory.getLogger(TestJdbcInsertWithMySQL.class);
+ private static JdbcDatabaseContainer<?> jdbcContainer;
+
+ @BeforeClass
+ public static void initMysql() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ String osName = System.getProperty("os.name").toLowerCase();
+ String mysqlDBName = "drill_mysql_test";
+
+ DockerImageName imageName;
+ if (osName.startsWith("linux") && "aarch64".equals(System.getProperty("os.arch"))) {
+ imageName = DockerImageName.parse(DOCKER_IMAGE_MARIADB).asCompatibleSubstituteFor("mysql");
+ } else {
+ imageName = DockerImageName.parse(DOCKER_IMAGE_MYSQL);
+ }
+
+ jdbcContainer = new MySQLContainer<>(imageName)
+ .withExposedPorts(3306)
+ .withConfigurationOverride("mysql_config_override")
+ .withUsername("mysqlUser")
+ .withPassword("mysqlPass")
+ .withDatabaseName(mysqlDBName)
+ .withUrlParam("serverTimezone", "UTC")
+ .withUrlParam("useJDBCCompliantTimezoneShift", "true")
+ .withInitScript("mysql-test-data.sql");
+ jdbcContainer.start();
+
+ if (osName.startsWith("linux")) {
+ JdbcDatabaseDelegate databaseDelegate = new JdbcDatabaseDelegate(jdbcContainer, "");
+ ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
+ }
+
+ Map<String, Object> sourceParameters = new HashMap<>();
+ sourceParameters.put("maximumPoolSize", "1");
+ sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+ sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+ sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+ sourceParameters.put("minimumIdle", "0");
+
+ String jdbcUrl = jdbcContainer.getJdbcUrl();
+ logger.debug("JDBC URL: {}", jdbcUrl);
+ JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+ jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+ jdbcStorageConfig.setEnabled(true);
+
+ cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
+
+ JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+ jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+ jdbcStorageConfigNoWrite.setEnabled(true);
+
+ cluster.defineStoragePlugin("mysql_no_write", jdbcStorageConfigNoWrite);
+
+ if (osName.startsWith("linux")) {
+ // adds storage plugin with case insensitive table names
+ JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+ jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+ jdbcCaseSensitiveStorageConfig.setEnabled(true);
+ cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
+ }
+ }
+
+ @Test
+ public void testInsertValues() throws Exception {
+ String tableName = "mysql.`drill_mysql_test`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(1L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectValues() throws Exception {
+ String tableName = "mysql.`drill_mysql_test`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromJdbcTable() throws Exception {
+ String tableName = "mysql.`drill_mysql_test`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromNonJdbcTable() throws Exception {
+ String tableName = "mysql.`drill_mysql_test`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(3L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(0, 0)
+ .baselineValues(1, 1)
+ .baselineValues(2, 1)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @AfterClass
+ public static void stopMysql() {
+ if (jdbcContainer != null) {
+ jdbcContainer.stop();
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java
new file mode 100644
index 0000000000..fcb6659948
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcInsertWithPostgres.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcInsertWithPostgres extends ClusterTest {
+
+ private static final String DOCKER_IMAGE_POSTGRES_X86 = "postgres:12.8-alpine3.14";
+ private static JdbcDatabaseContainer<?> jdbcContainer;
+
+ @BeforeClass
+ public static void initPostgres() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ dirTestWatcher.copyResourceToRoot(Paths.get(""));
+
+ String postgresDBName = "drill_postgres_test";
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+ DockerImageName imageName = DockerImageName.parse(DOCKER_IMAGE_POSTGRES_X86);
+ jdbcContainer = new PostgreSQLContainer<>(imageName)
+ .withUsername("postgres")
+ .withPassword("password")
+ .withDatabaseName(postgresDBName)
+ .withInitScript("postgres-test-data.sql");
+ jdbcContainer.start();
+
+ Map<String, Object> sourceParameters = new HashMap<>();
+ sourceParameters.put("maximumPoolSize", "16");
+ sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+ sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+ sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+ sourceParameters.put("minimumIdle", "0");
+
+ JdbcStorageConfig jdbcStorageConfig =
+ new JdbcStorageConfig("org.postgresql.Driver",
+ jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
+ true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+ jdbcStorageConfig.setEnabled(true);
+ cluster.defineStoragePlugin("pg", jdbcStorageConfig);
+
+ JdbcStorageConfig unWritableJdbcStorageConfig =
+ new JdbcStorageConfig("org.postgresql.Driver",
+ jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
+ true, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
+ unWritableJdbcStorageConfig.setEnabled(true);
+ cluster.defineStoragePlugin("pg_unwritable", unWritableJdbcStorageConfig);
+
+ }
+
+ @AfterClass
+ public static void stopPostgres() {
+ if (jdbcContainer != null) {
+ jdbcContainer.stop();
+ }
+ }
+
+ @Test
+ public void testInsertValues() throws Exception {
+ String tableName = "`pg`.`public`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "insert into %s(ID, NAME) VALUES (3,4)";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(1L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectValues() throws Exception {
+ String tableName = "`pg`.`public`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM (VALUES(1,2), (3,4))";
+ queryBuilder()
+ .sql(insertQuery, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromJdbcTable() throws Exception {
+ String tableName = "`pg`.`public`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2), (3,4))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT * FROM %s";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .include("Jdbc\\(sql=\\[INSERT INTO")
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(2L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .baselineValues(1, 2)
+ .baselineValues(3, 4)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+
+ @Test
+ public void testInsertSelectFromNonJdbcTable() throws Exception {
+ String tableName = "`pg`.`public`.`test_table`";
+ try {
+ String query = "CREATE TABLE %s (ID, NAME) AS (VALUES(1,2))";
+ // Create the table and insert the values
+ QuerySummary insertResults = queryBuilder()
+ .sql(query, tableName)
+ .run();
+ assertTrue(insertResults.succeeded());
+
+ String insertQuery = "INSERT INTO %s SELECT n_nationkey, n_regionkey FROM cp.`tpch/nation.parquet` limit 3";
+ queryBuilder()
+ .sql(insertQuery, tableName, tableName)
+ .planMatcher()
+ .exclude("Jdbc\\(sql=\\[INSERT INTO") // insert cannot be pushed down
+ .match();
+
+ testBuilder()
+ .sqlQuery(insertQuery, tableName, tableName)
+ .unOrdered()
+ .baselineColumns("ROWCOUNT")
+ .baselineValues(3L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("ID", "NAME")
+ .baselineValues(1, 2)
+ .baselineValues(0, 0)
+ .baselineValues(1, 1)
+ .baselineValues(2, 1)
+ .go();
+ } finally {
+ queryBuilder()
+ .sql("DROP TABLE IF EXISTS %s", tableName)
+ .run();
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
index 0335e8a58b..b0ce8caa08 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
@@ -35,6 +35,8 @@ import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
import org.h2.tools.RunScript;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -159,15 +161,15 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
.addNullable("bigint_field", MinorType.BIGINT, 38)
.addNullable("float4_field", MinorType.FLOAT4, 38)
.addNullable("float8_field", MinorType.FLOAT8, 38)
- .addNullable("varchar_field", MinorType.VARCHAR, 38)
+ .addNullable("varchar_field", MinorType.VARCHAR, 3)
.addNullable("date_field", MinorType.DATE, 10)
- .addNullable("time_field", MinorType.TIME, 8)
- .addNullable("timestamp_field", MinorType.TIMESTAMP, 26, 6)
+ .addNullable("time_field", MinorType.TIME, 12, 3)
+ .addNullable("timestamp_field", MinorType.TIMESTAMP, 23, 3)
.addNullable("boolean_field", MinorType.BIT, 1)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+ .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
.build();
RowSetUtilities.verify(expected, results);
@@ -388,7 +390,8 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
queryBuilder().sql(sql).run();
fail();
} catch (UserRemoteException e) {
- assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+ MatcherAssert.assertThat(e.getMessage(),
+ CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
}
}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
index 4b6b38d0d0..95158989a1 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
@@ -33,6 +33,8 @@ import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -59,11 +61,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-/**
- * JDBC storage plugin tests against MySQL.
- * Note: it requires libaio1.so library on Linux
- */
-
@Category(JdbcStorageTest.class)
public class TestJdbcWriterWithMySQL extends ClusterTest {
private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
@@ -283,15 +280,15 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
.addNullable("bigint_field", MinorType.BIGINT, 19)
.addNullable("float4_field", MinorType.FLOAT8, 12)
.addNullable("float8_field", MinorType.FLOAT8, 22)
- .addNullable("varchar_field", MinorType.VARCHAR, 38)
+ .addNullable("varchar_field", MinorType.VARCHAR, 3)
.addNullable("date_field", MinorType.DATE, 10)
- .addNullable("time_field", MinorType.TIME, 10)
- .addNullable("timestamp_field", MinorType.TIMESTAMP, 19)
- .addNullable("boolean_field", MinorType.BIT)
+ .addNullable("time_field", MinorType.TIME, 14)
+ .addNullable("timestamp_field", MinorType.TIMESTAMP, 23)
+ .addNullable("boolean_field", MinorType.BIT, 1)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+ .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
.build();
RowSetUtilities.verify(expected, results);
@@ -435,7 +432,8 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
queryBuilder().sql(sql).run();
fail();
} catch (UserRemoteException e) {
- assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not support writing complex fields to JDBC data sources."));
+ MatcherAssert.assertThat(e.getMessage(),
+ CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not support writing complex fields to JDBC data sources."));
}
}
@@ -447,7 +445,8 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
queryBuilder().sql(sql).run();
fail();
} catch (UserRemoteException e) {
- assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+ MatcherAssert.assertThat(e.getMessage(),
+ CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
}
}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
index 8c30691976..43ca9a9928 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
@@ -33,6 +33,8 @@ import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -185,15 +187,15 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
.addNullable("bigint_field", MinorType.BIGINT, 19)
.addNullable("float4_field", MinorType.FLOAT8, 17, 17)
.addNullable("float8_field", MinorType.FLOAT8, 17, 17)
- .addNullable("varchar_field", MinorType.VARCHAR, 38)
- .addNullable("date_field", MinorType.DATE, 10)
- .addNullable("time_field", MinorType.TIME, 10)
- .addNullable("timestamp_field", MinorType.TIMESTAMP, 19)
- .addNullable("boolean_field", MinorType.BIT)
+ .addNullable("varchar_field", MinorType.VARCHAR, 3)
+ .addNullable("date_field", MinorType.DATE, 13)
+ .addNullable("time_field", MinorType.TIME, 12, 3)
+ .addNullable("timestamp_field", MinorType.TIMESTAMP, 26, 3)
+ .addNullable("boolean_field", MinorType.BIT, 1)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155000L, true)
+ .addRow(1, 2L, 3.0, 4.0, "5.0", LocalDate.parse("2021-01-01"), LocalTime.parse("12:00"), 1451516155230L, true)
.build();
RowSetUtilities.verify(expected, results);
@@ -459,7 +461,8 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
queryBuilder().sql(sql).run();
fail();
} catch (UserRemoteException e) {
- assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
+ MatcherAssert.assertThat(e.getMessage(),
+ CoreMatchers.containsString("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. repeated_field is an array."));
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 57ba2da6ba..fadc8a348d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -68,7 +68,7 @@ import io.netty.buffer.DrillBuf;
// in fragment contexts
public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
- public enum SqlStatementType {OTHER, ANALYZE, CTAS, EXPLAIN, DESCRIBE_TABLE, DESCRIBE_SCHEMA, REFRESH, SELECT, SETOPTION};
+ public enum SqlStatementType {OTHER, ANALYZE, CTAS, EXPLAIN, DESCRIBE_TABLE, DESCRIBE_SCHEMA, REFRESH, SELECT, SETOPTION, INSERT};
private final DrillbitContext drillbitContext;
private final UserSession session;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index be6eae8b2d..ea2221abaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StatisticsAggregate;
import org.apache.drill.exec.physical.config.StatisticsMerge;
import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TableModify;
import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -230,6 +231,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
return visitOp(op, value);
}
+ @Override
+ public T visitTableModify(TableModify op, X value) throws E {
+ return visitOp(op, value);
+ }
+
@Override
public T visitOp(PhysicalOperator op, X value) throws E {
throw new UnsupportedOperationException(String.format(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 1a63538edd..d1b1479bb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StatisticsAggregate;
import org.apache.drill.exec.physical.config.StatisticsMerge;
import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.TableModify;
import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -94,4 +95,5 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitLateralJoin(LateralJoinPOP lateralJoinPOP, EXTRA value) throws EXCEP;
public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP;
+ public RETURN visitTableModify(TableModify op, EXTRA value) throws EXCEP;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java
new file mode 100644
index 0000000000..d900075e5d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TableModify.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+@JsonTypeName("table_modify")
+public class TableModify extends AbstractSingle {
+ public static final String OPERATOR_TYPE = "TABLE_MODIFY";
+
+ @JsonCreator
+ public TableModify(@JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new TableModify(child);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitTableModify(this, value);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getSVMode() {
+ return child.getSVMode();
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java
new file mode 100644
index 0000000000..5e9a7efaaf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/InsertWriterRecordBatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+
+public class InsertWriterRecordBatch extends WriterRecordBatch {
+
+ public InsertWriterRecordBatch(Writer writer, RecordBatch incoming,
+ FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+ super(writer, incoming, context, recordWriter);
+ }
+
+ @Override
+ protected void addOutputContainerData() {
+ BigIntVector rowcountVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class,
+ container.getValueVectorId(SchemaPath.getSimplePath("ROWCOUNT")).getFieldIds())
+ .getValueVector();
+ AllocationHelper.allocate(rowcountVector, 1, 8);
+ rowcountVector.getMutator().setSafe(0, counter);
+ rowcountVector.getMutator().setValueCount(1);
+
+ container.setRecordCount(1);
+ }
+
+ protected void addOutputSchema() {
+ // Create vector for ROWCOUNT - number of records written.
+ final MaterializedField rowcountField =
+ MaterializedField.create("ROWCOUNT",
+ Types.required(TypeProtos.MinorType.BIGINT));
+
+ container.addOrGet(rowcountField);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 3c2ba083dc..307d5b269c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -47,7 +47,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
private EventBasedRecordWriter eventBasedRecordWriter;
private RecordWriter recordWriter;
- private long counter;
+ protected long counter;
private final RecordBatch incoming;
private boolean processed;
private final String fragmentUniqueId;
@@ -134,7 +134,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
return IterOutcome.OK_NEW_SCHEMA;
}
- private void addOutputContainerData() {
+ protected void addOutputContainerData() {
final VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById(
VarCharVector.class,
container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds())
@@ -163,17 +163,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
.addContext("Failure updating record writer schema")
.build(logger);
}
- // Create two vectors for:
- // 1. Fragment unique id.
- // 2. Summary: currently contains number of records written.
- final MaterializedField fragmentIdField =
- MaterializedField.create("Fragment", Types.required(MinorType.VARCHAR));
- final MaterializedField summaryField =
- MaterializedField.create("Number of records written",
- Types.required(MinorType.BIGINT));
-
- container.addOrGet(fragmentIdField);
- container.addOrGet(summaryField);
+ addOutputSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
} finally {
stats.stopSetup();
@@ -190,6 +180,20 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
schema = container.getSchema();
}
+ protected void addOutputSchema() {
+ // Create two vectors for:
+ // 1. Fragment unique id.
+ // 2. Summary: currently contains number of records written.
+ final MaterializedField fragmentIdField =
+ MaterializedField.create("Fragment", Types.required(MinorType.VARCHAR));
+ final MaterializedField summaryField =
+ MaterializedField.create("Number of records written",
+ Types.required(MinorType.BIGINT));
+
+ container.addOrGet(fragmentIdField);
+ container.addOrGet(summaryField);
+ }
+
/** Clean up needs to be performed before closing writer. Partially written data will be removed. */
private void closeWriter() {
if (recordWriter == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 524400dfc3..a0f8024fea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -20,9 +20,11 @@ package org.apache.drill.exec.planner;
import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
import org.apache.drill.exec.planner.logical.DrillDistinctJoinToSemiJoinRule;
import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
+import org.apache.drill.exec.planner.logical.DrillTableModifyRule;
import org.apache.drill.exec.planner.physical.MetadataAggPrule;
import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
+import org.apache.drill.exec.planner.physical.TableModifyPrule;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -370,7 +372,8 @@ public enum PlannerPhase {
DrillUnionAllRule.INSTANCE,
DrillValuesRule.INSTANCE,
DrillUnnestRule.INSTANCE,
- DrillCorrelateRule.INSTANCE
+ DrillCorrelateRule.INSTANCE,
+ DrillTableModifyRule.INSTANCE
).build();
/**
@@ -552,6 +555,7 @@ public enum PlannerPhase {
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
+ ruleList.add(TableModifyPrule.INSTANCE);
if (ps.isHashAggEnabled()) {
ruleList.add(HashAggPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index adfddd5a51..bf1fd6f789 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -690,7 +690,7 @@ public abstract class DrillRelOptUtil {
}
}
- public static DrillTable getDrillTable(final TableScan scan) {
+ public static DrillTable getDrillTable(RelNode scan) {
DrillTable drillTable = scan.getTable().unwrap(DrillTable.class);
if (drillTable == null) {
DrillTranslatableTable transTable = scan.getTable().unwrap(DrillTranslatableTable.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index c1bcf68616..ec8899a839 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -188,7 +188,7 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
if (indexDesc.getCollation() != null &&
!settings.isIndexForceSortNonCovering()) {
collation = IndexPlanUtils.buildCollationNonCoveringIndexScan(indexDesc, indexScanRowType, dbscanRowType, indexContext);
- if (restrictedScanTraitSet.contains(RelCollationTraitDef.INSTANCE)) { // replace existing trait
+ if (restrictedScanTraitSet.getTrait(RelCollationTraitDef.INSTANCE) != null) { // replace existing trait
restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).replace(collation);
} else { // add new one
restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).plus(collation);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java
new file mode 100644
index 0000000000..d86b808f5b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModify.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.logical.data.InsertWriter;
+import org.apache.drill.common.logical.data.LogicalOperator;
+
+import java.util.List;
+
+public class DrillTableModify extends TableModify implements DrillRel {
+
+ protected DrillTableModify(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
+ Prepare.CatalogReader catalogReader, RelNode input, Operation operation,
+ List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traitSet, table, catalogReader, input, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator childOp = implementor.visitChild(this, 0, getInput());
+
+ InsertWriter logicalOperators = new InsertWriter(getTable());
+ logicalOperators.setInput(childOp);
+ return logicalOperators;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillTableModify(getCluster(), traitSet, getTable(), getCatalogReader(),
+ inputs.get(0), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ double rowCount = mq.getRowCount(this);
+ double inputRowCount = mq.getRowCount(getInput());
+ double dIo = inputRowCount + 1; // ensure non-zero cost
+ return planner.getCostFactory().makeCost(rowCount, 0, dIo).multiplyBy(10);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java
new file mode 100644
index 0000000000..7a177c2e1c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTableModifyRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+
+public class DrillTableModifyRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillTableModifyRule();
+
+ private final RelTrait out;
+
+ private DrillTableModifyRule() {
+ super(RelOptHelper.any(LogicalTableModify.class, Convention.NONE),
+ DrillRelFactories.LOGICAL_BUILDER, "DrillTableModifyRule");
+ this.out = DrillRel.DRILL_LOGICAL;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ TableModify tableModify = call.rel(0);
+ RelNode input = tableModify.getInput();
+ RelTraitSet traits = tableModify.getTraitSet().plus(out);
+ RelNode convertedInput = convert(input, input.getTraitSet().plus(out).simplify());
+
+ call.transformTo(new DrillTableModify(
+ tableModify.getCluster(), traits, tableModify.getTable(),
+ tableModify.getCatalogReader(), convertedInput, tableModify.getOperation(),
+ tableModify.getUpdateColumnList(), tableModify.getSourceExpressionList(), tableModify.isFlattened()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java
new file mode 100644
index 0000000000..1f77c79e13
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ModifyTableEntry.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+
+import java.io.IOException;
+
+public interface ModifyTableEntry {
+
+ Writer getWriter(PhysicalOperator child) throws IOException;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
index 937c2e5837..6d9fbc3971 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
@@ -36,7 +36,7 @@ public class ScreenPrule extends Prule{
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillScreenRelBase screen = (DrillScreenRelBase) call.rel(0);
+ final DrillScreenRelBase screen = call.rel(0);
final RelNode input = call.rel(1);
final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
@@ -44,6 +44,4 @@ public class ScreenPrule extends Prule{
DrillScreenRelBase newScreen = new ScreenPrel(screen.getCluster(), screen.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput);
call.transformTo(newScreen);
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java
new file mode 100644
index 0000000000..8e7e917641
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.record.BatchSchema;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class TableModifyPrel extends TableModify implements Prel {
+
+ protected TableModifyPrel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
+ Prepare.CatalogReader catalogReader, RelNode input, Operation operation,
+ List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traitSet, table, catalogReader, input, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new TableModifyPrel(getCluster(), traitSet, getTable(), getCatalogReader(),
+ inputs.get(0), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ Prel child = (Prel) this.getInput();
+
+ List<String> tablePath = getTable().getQualifiedName();
+ List<String> schemaPath = tablePath.size() > 1
+ ? tablePath.subList(0, tablePath.size() - 1)
+ : Collections.emptyList();
+ String tableName = tablePath.get(tablePath.size() - 1);
+ SchemaPlus schema = ((CalciteCatalogReader) getTable().getRelOptSchema()).getRootSchema().plus();
+
+ ModifyTableEntry modifyTableEntry = SchemaUtilites.resolveToDrillSchema(schema, schemaPath)
+ .modifyTable(tableName);
+
+ PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+ PhysicalOperator p = modifyTableEntry.getWriter(childPOP);
+ return creator.addMetadata(this, p);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitTableModify(this, value);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ @NotNull
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getInput());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ double rowCount = mq.getRowCount(this);
+ double inputRowCount = mq.getRowCount(getInput());
+ double dIo = inputRowCount + 1; // ensure non-zero cost
+ return planner.getCostFactory().makeCost(rowCount, 0, dIo).multiplyBy(10);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
index 937c2e5837..2b73d21728 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TableModifyPrule.java
@@ -17,33 +17,31 @@
*/
package org.apache.drill.exec.planner.physical;
-import org.apache.drill.exec.planner.common.DrillScreenRelBase;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillScreenRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillTableModify;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
-public class ScreenPrule extends Prule{
- public static final RelOptRule INSTANCE = new ScreenPrule();
-
+public class TableModifyPrule extends Prule {
+ public static final RelOptRule INSTANCE = new TableModifyPrule();
- public ScreenPrule() {
- super(RelOptHelper.some(DrillScreenRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.ScreenPrule");
+ private TableModifyPrule() {
+ super(RelOptHelper.some(DrillTableModify.class, RelOptHelper.any(RelNode.class)), "TableModifyPrule");
}
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillScreenRelBase screen = (DrillScreenRelBase) call.rel(0);
- final RelNode input = call.rel(1);
-
- final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
- final RelNode convertedInput = convert(input, traits);
- DrillScreenRelBase newScreen = new ScreenPrel(screen.getCluster(), screen.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput);
- call.transformTo(newScreen);
- }
+ DrillTableModify tableModify = call.rel(0);
+ RelNode input = tableModify.getInput();
+ RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ RelNode convertedInput = convert(input, traits);
+ call.transformTo(new TableModifyPrel(
+ tableModify.getCluster(), traits, tableModify.getTable(),
+ tableModify.getCatalogReader(), convertedInput, tableModify.getOperation(),
+ tableModify.getUpdateColumnList(), tableModify.getSourceExpressionList(), tableModify.isFlattened()));
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
index dd11d03cca..f268da3ffc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -86,4 +87,9 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements
public RETURN visitLeaf(LeafPrel prel, EXTRA value) throws EXCEP {
return visitPrel(prel, value);
}
+
+ @Override
+ public RETURN visitTableModify(TableModifyPrel prel, EXTRA value) throws EXCEP {
+ return visitPrel(prel, value);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index a97c10f507..b956b0b35a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -44,4 +45,5 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
RETURN visitLateral(LateralJoinPrel prel, EXTRA value) throws EXCEP;
RETURN visitLeaf(LeafPrel prel, EXTRA value) throws EXCEP;
+ RETURN visitTableModify(TableModifyPrel prel, EXTRA value) throws EXCEP;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
index 3dfc030f2d..fa90c5fff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.TableModifyPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
@@ -253,4 +254,11 @@ public class PrelVisualizerVisitor
visitPrel(prel, value);
return null;
}
+
+ @Override
+ public Void visitTableModify(TableModifyPrel prel, VisualizationState value) throws Exception {
+ visitBasePrel(prel, value);
+ endNode(prel, value);
+ return null;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index e71ff894a5..1d75e6c0aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DescribeSchemaHandler;
import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
+import org.apache.drill.exec.planner.sql.handlers.InsertHandler;
import org.apache.drill.exec.planner.sql.handlers.MetastoreAnalyzeTableHandler;
import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
import org.apache.drill.exec.planner.sql.handlers.ResetOptionHandler;
@@ -50,7 +51,6 @@ import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
-import org.apache.drill.exec.planner.sql.parser.DrillSqlSetOption;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
import org.apache.drill.exec.testing.ControlsInjector;
@@ -215,16 +215,11 @@ public class DrillSqlWorker {
context.setSQLStatementType(SqlStatementType.EXPLAIN);
break;
case SET_OPTION:
- if (sqlNode instanceof DrillSqlSetOption) {
- handler = new SetOptionHandler(context);
- context.setSQLStatementType(SqlStatementType.SETOPTION);
- break;
- }
- if (sqlNode instanceof DrillSqlResetOption) {
- handler = new ResetOptionHandler(context);
- context.setSQLStatementType(SqlStatementType.SETOPTION);
- break;
- }
+ handler = sqlNode instanceof DrillSqlResetOption
+ ? new ResetOptionHandler(context)
+ : new SetOptionHandler(context);
+ context.setSQLStatementType(SqlStatementType.SETOPTION);
+ break;
case DESCRIBE_TABLE:
if (sqlNode instanceof DrillSqlDescribeTable) {
handler = new DescribeTableHandler(config);
@@ -246,6 +241,10 @@ public class DrillSqlWorker {
handler = ((DrillSqlCall) sqlNode).getSqlHandler(config, textPlan);
context.setSQLStatementType(SqlStatementType.CTAS);
break;
+ case INSERT:
+ handler = new InsertHandler(config, textPlan);
+ context.setSQLStatementType(SqlStatementType.INSERT);
+ break;
case SELECT:
handler = new DefaultSqlHandler(config, textPlan);
context.setSQLStatementType(SqlStatementType.SELECT);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java
new file mode 100644
index 0000000000..c3d23e5b9f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/InsertHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constructs plan to be executed for inserting data into the table.
+ */
+public class InsertHandler extends DefaultSqlHandler {
+ private static final Logger logger = LoggerFactory.getLogger(InsertHandler.class);
+
+ public InsertHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
+ super(config, textPlan);
+ }
+
+ protected ConvertedRelNode validateAndConvert(SqlNode sqlNode)
+ throws ForemanSetupException, RelConversionException, ValidationException {
+ ConvertedRelNode convertedRelNode = super.validateAndConvert(sqlNode);
+
+ String storageName = SchemaUtilites.getSchemaPathAsList(
+ convertedRelNode.getConvertedNode().getTable().getQualifiedName().iterator().next()).iterator().next();
+ try {
+ if (!context.getStorage().getPlugin(storageName).supportsInsert()) {
+ throw UserException.validationError()
+ .message("Storage plugin [%s] is immutable or doesn't support inserts", storageName)
+ .build(logger);
+ }
+ } catch (StoragePluginRegistry.PluginException e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ return convertedRelNode;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
index c2dd077fbe..b6614e9482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
@@ -24,7 +24,9 @@ import java.util.Optional;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.tools.RuleSet;
import org.apache.drill.common.util.function.CheckedSupplier;
import org.apache.drill.exec.ops.QueryContext;
@@ -72,16 +74,34 @@ public class SqlHandlerConfig {
@Override
public RelNode visit(TableScan scan) {
+ collectPlugins(scan);
+ return scan;
+ }
+
+ @Override
+ public RelNode visit(LogicalTableModify modify) {
+ collectPlugins(modify);
+ return visitChildren(modify);
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof TableModify) {
+ collectPlugins(other);
+ }
+ return super.visit(other);
+ }
+
+ private void collectPlugins(RelNode relNode) {
String pluginName = SchemaUtilites.getSchemaPathAsList(
- scan.getTable().getQualifiedName().iterator().next()).iterator().next();
+ relNode.getTable().getQualifiedName().iterator().next()).iterator().next();
CheckedSupplier<StoragePlugin, StoragePluginRegistry.PluginException> pluginsProvider =
() -> storagePlugins.getPlugin(pluginName);
- StoragePlugin storagePlugin = Optional.ofNullable(DrillRelOptUtil.getDrillTable(scan))
+ StoragePlugin storagePlugin = Optional.ofNullable(DrillRelOptUtil.getDrillTable(relNode))
.map(DrillTable::getPlugin)
.orElseGet(pluginsProvider);
plugins.add(storagePlugin);
- return scan;
}
public List<StoragePlugin> getPlugins() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 28aee4e682..b8d68537b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.ModifyTableEntry;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
import org.apache.drill.exec.store.table.function.TableParamDef;
@@ -188,6 +189,18 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
.build(logger);
}
+ /**
+ * Returns table entry using table name to insert records into the table.
+ *
+ * @param tableName : new table name.
+ * @return insert table entry
+ */
+ public ModifyTableEntry modifyTable(String tableName) {
+ throw UserException.unsupportedError()
+ .message("Modifying tables is not supported in schema [%s]", getSchemaPath())
+ .build(logger);
+ }
+
/**
* Creates table entry using table name and list of partition columns if any.
* Table folder and files will be created using persistent storage strategy.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 485e25b158..2015002f54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -59,6 +59,11 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
return false;
}
+ @Override
+ public boolean supportsInsert() {
+ return false;
+ }
+
/**
* @deprecated Marking for deprecation in next major version release. Use
* {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index d8a1a0bfc4..db4ea3a659 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -55,6 +55,11 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
*/
boolean supportsWrite();
+ /**
+ * Indicates if Drill can insert to a table to this plugin.
+ */
+ boolean supportsInsert();
+
/**
* Method returns a Jackson serializable object that extends a StoragePluginConfig.
*
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java b/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java
new file mode 100644
index 0000000000..eb7298c7ea
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/InsertWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("insert_writer")
+public class InsertWriter extends Writer {
+
+ @JsonCreator
+ public InsertWriter(@JsonProperty("createTableEntry") Object createTableEntry) {
+ super(createTableEntry);
+ }
+}