You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2022/05/12 07:08:24 UTC
[drill] branch master updated: DRILL-8216: Use EVF-based JSON reader for Values operator
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 3dc4f25562 DRILL-8216: Use EVF-based JSON reader for Values operator
3dc4f25562 is described below
commit 3dc4f2556267e300bbca75b109684e06b16470d2
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Sun May 8 13:54:45 2022 +0300
DRILL-8216: Use EVF-based JSON reader for Values operator
---
.../drill/exec/store/hive/HiveUtilities.java | 87 +---------------------
.../exec/store/jdbc/TestJdbcWriterWithH2.java | 32 ++++----
.../exec/store/jdbc/TestJdbcWriterWithMySQL.java | 32 ++++----
.../store/jdbc/TestJdbcWriterWithPostgres.java | 32 ++++----
.../apache/drill/exec/physical/config/Values.java | 20 +++--
.../physical/impl/scan/v3/schema/SchemaUtils.java | 87 +++++++++++++++++++++-
.../physical/impl/values/ValuesBatchCreator.java | 48 +++++++++---
.../exec/planner/common/DrillValuesRelBase.java | 27 ++++---
.../drill/exec/planner/logical/DrillValuesRel.java | 6 +-
.../drill/exec/planner/physical/ValuesPrel.java | 22 +++++-
.../sql/handlers/MetastoreAnalyzeTableHandler.java | 2 +-
.../store/easy/json/JsonStreamBatchReader.java | 71 ++++++++++++++++++
.../org/apache/drill/TestProjectWithFunctions.java | 2 +-
.../java/org/apache/drill/exec/sql/TestValues.java | 4 +-
.../apache/drill/common/logical/data/Values.java | 15 ++--
15 files changed, 304 insertions(+), 183 deletions(-)
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 531284d156..46c62b2f54 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -18,17 +18,9 @@
package org.apache.drill.exec.store.hive;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.MapColumnMetadata;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
-import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import io.netty.buffer.DrillBuf;
@@ -771,81 +763,6 @@ public class HiveUtilities {
return new HiveTableWrapper.HivePartitionWrapper(new HivePartition(partition, listIndex));
}
- /**
- * Converts specified {@code RelDataType relDataType} into {@link ColumnMetadata}.
- * For the case when specified relDataType is struct, map with recursively converted children
- * will be created.
- *
- * @param name filed name
- * @param relDataType filed type
- * @return {@link ColumnMetadata} which corresponds to specified {@code RelDataType relDataType}
- */
- public static ColumnMetadata getColumnMetadata(String name, RelDataType relDataType) {
- switch (relDataType.getSqlTypeName()) {
- case ARRAY:
- return getArrayMetadata(name, relDataType);
- case MAP:
- case OTHER:
- throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName()));
- default:
- if (relDataType.isStruct()) {
- return getStructMetadata(name, relDataType);
- } else {
- return new PrimitiveColumnMetadata(
- MaterializedField.create(name,
- TypeInferenceUtils.getDrillMajorTypeFromCalciteType(relDataType)));
- }
- }
- }
-
- /**
- * Returns {@link ColumnMetadata} instance which corresponds to specified array {@code RelDataType relDataType}.
- *
- * @param name name of the filed
- * @param relDataType the source of type information to construct the schema
- * @return {@link ColumnMetadata} instance
- */
- private static ColumnMetadata getArrayMetadata(String name, RelDataType relDataType) {
- RelDataType componentType = relDataType.getComponentType();
- ColumnMetadata childColumnMetadata = getColumnMetadata(name, componentType);
- switch (componentType.getSqlTypeName()) {
- case ARRAY:
- // for the case when nested type is array, it should be placed into repeated list
- return MetadataUtils.newRepeatedList(name, childColumnMetadata);
- case MAP:
- case OTHER:
- throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName()));
- default:
- if (componentType.isStruct()) {
- // for the case when nested type is struct, it should be placed into repeated map
- return MetadataUtils.newMapArray(name, childColumnMetadata.tupleSchema());
- } else {
- // otherwise creates column metadata with repeated data mode
- return new PrimitiveColumnMetadata(
- MaterializedField.create(name,
- Types.overrideMode(
- TypeInferenceUtils.getDrillMajorTypeFromCalciteType(componentType),
- DataMode.REPEATED)));
- }
- }
- }
-
- /**
- * Returns {@link MapColumnMetadata} column metadata created based on specified {@code RelDataType relDataType} with
- * converted to {@link ColumnMetadata} {@code relDataType}'s children.
- *
- * @param name name of the filed
- * @param relDataType {@link RelDataType} the source of the children for resulting schema
- * @return {@link MapColumnMetadata} column metadata
- */
- private static MapColumnMetadata getStructMetadata(String name, RelDataType relDataType) {
- TupleMetadata mapSchema = new TupleSchema();
- for (RelDataTypeField relDataTypeField : relDataType.getFieldList()) {
- mapSchema.addColumn(getColumnMetadata(relDataTypeField.getName(), relDataTypeField.getType()));
- }
- return MetadataUtils.newMap(name, mapSchema);
- }
-
/**
* Converts specified {@code FieldSchema column} into {@link ColumnMetadata}.
* For the case when specified relDataType is struct, map with recursively converted children
@@ -857,7 +774,7 @@ public class HiveUtilities {
*/
public static ColumnMetadata getColumnMetadata(HiveToRelDataTypeConverter dataTypeConverter, FieldSchema column) {
RelDataType relDataType = dataTypeConverter.convertToNullableRelDataType(column);
- return getColumnMetadata(column.getName(), relDataType);
+ return SchemaUtils.getColumnMetadata(column.getName(), relDataType);
}
}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
index ce112b84e8..0335e8a58b 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
@@ -117,13 +117,13 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -190,13 +190,13 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("My id", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("My name", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("My id", MinorType.INT, DataMode.OPTIONAL)
+ .add("My name", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -260,13 +260,13 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -290,13 +290,13 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
index 6076267886..4b6b38d0d0 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
@@ -145,13 +145,13 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -174,13 +174,13 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -203,13 +203,13 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("My id", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("My name", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("My id", MinorType.INT, DataMode.OPTIONAL)
+ .add("My name", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -379,13 +379,13 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
index 2be9c3a2e3..8c30691976 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
@@ -143,13 +143,13 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -216,13 +216,13 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -297,13 +297,13 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("My id", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("My name", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("My id", MinorType.INT, DataMode.OPTIONAL)
+ .add("My name", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
@@ -366,13 +366,13 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
DirectRowSet results = queryBuilder().sql(testQuery).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("ID", MinorType.BIGINT, DataMode.OPTIONAL)
- .add("NAME", MinorType.BIGINT, DataMode.OPTIONAL)
+ .add("ID", MinorType.INT, DataMode.OPTIONAL)
+ .add("NAME", MinorType.INT, DataMode.OPTIONAL)
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1L, 2L)
- .addRow(3L, 4L)
+ .addRow(1, 2)
+ .addRow(3, 4)
.build();
RowSetUtilities.verify(expected, results);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Values.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Values.java
index d4bb862319..fd7a897d46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Values.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Values.java
@@ -21,7 +21,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.Leaf;
@@ -31,21 +30,32 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
public class Values extends AbstractBase implements Leaf {
public static final String OPERATOR_TYPE = "VALUES";
- private final JSONOptions content;
+ private final String content;
+
+ private final TupleMetadata schema;
@JsonCreator
- public Values(@JsonProperty("content") JSONOptions content){
+ public Values(
+ @JsonProperty("content") String content,
+ @JsonProperty("schema") TupleMetadata schema) {
this.content = content;
+ this.schema = schema;
}
- public JSONOptions getContent(){
+ public String getContent() {
return content;
}
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
return physicalVisitor.visitValues(this, value);
@@ -54,7 +64,7 @@ public class Values extends AbstractBase implements Leaf {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
assert children.isEmpty();
- return new Values(content);
+ return new Values(content, schema);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaUtils.java
index fa8d8fdb28..802a0b32b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaUtils.java
@@ -17,11 +17,18 @@
*/
package org.apache.drill.exec.physical.impl.scan.v3.schema;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
import org.apache.drill.exec.record.metadata.Propertied;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
@@ -96,7 +103,7 @@ public class SchemaUtils {
/**
* Perform the column-level projection as described in
- * {@link #isConsistent(RequestedColumn, ColumnMetadata)}, and raise a
+ * {@link #isConsistent(ProjectedColumn, ColumnMetadata)}, and raise a
* {@code UserException} if the column is not consistent with projection.
*
* @param colReq the column-level projection description
@@ -234,4 +241,80 @@ public class SchemaUtils {
dest.setProperty(ScanProjectionParser.PROJECTION_TYPE_PROP, value);
}
}
+
+ /**
+ * Converts specified {@code RelDataType relDataType} into {@link ColumnMetadata}.
+ * For the case when specified relDataType is struct, map with recursively converted children
+ * will be created.
+ *
+ * @param name filed name
+ * @param relDataType filed type
+ * @return {@link ColumnMetadata} which corresponds to specified {@code RelDataType relDataType}
+ */
+ public static ColumnMetadata getColumnMetadata(String name, RelDataType relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case ARRAY:
+ return getArrayMetadata(name, relDataType);
+ case MAP:
+ case OTHER:
+ throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName()));
+ default:
+ if (relDataType.isStruct()) {
+ return getStructMetadata(name, relDataType);
+ } else {
+ return new PrimitiveColumnMetadata(
+ MaterializedField.create(name,
+ TypeInferenceUtils.getDrillMajorTypeFromCalciteType(relDataType)));
+ }
+ }
+ }
+
+ /**
+ * Returns {@link ColumnMetadata} instance which corresponds to specified array {@code RelDataType relDataType}.
+ *
+ * @param name name of the filed
+ * @param relDataType the source of type information to construct the schema
+ * @return {@link ColumnMetadata} instance
+ */
+ private static ColumnMetadata getArrayMetadata(String name, RelDataType relDataType) {
+ RelDataType componentType = relDataType.getComponentType();
+ ColumnMetadata childColumnMetadata = getColumnMetadata(name, componentType);
+ switch (componentType.getSqlTypeName()) {
+ case ARRAY:
+ // for the case when nested type is array, it should be placed into repeated list
+ return MetadataUtils.newRepeatedList(name, childColumnMetadata);
+ case MAP:
+ case OTHER:
+ throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName()));
+ default:
+ if (componentType.isStruct()) {
+ // for the case when nested type is struct, it should be placed into repeated map
+ return MetadataUtils.newMapArray(name, childColumnMetadata.tupleSchema());
+ } else {
+ // otherwise creates column metadata with repeated data mode
+ return new PrimitiveColumnMetadata(
+ MaterializedField.create(name,
+ Types.overrideMode(
+ TypeInferenceUtils.getDrillMajorTypeFromCalciteType(componentType),
+ TypeProtos.DataMode.REPEATED)));
+ }
+ }
+ }
+
+ /**
+ * Returns {@link MapColumnMetadata} column metadata created based on specified {@code RelDataType relDataType} with
+ * converted to {@link ColumnMetadata} {@code relDataType}'s children.
+ *
+ * @param name name of the filed
+ * @param relDataType {@link RelDataType} the source of the children for resulting schema
+ * @return {@link MapColumnMetadata} column metadata
+ */
+ private static MapColumnMetadata getStructMetadata(String name, RelDataType relDataType) {
+ TupleMetadata mapSchema = new TupleSchema();
+ relDataType.getFieldList().stream()
+ .map(field -> getColumnMetadata(field.getName(), field.getType()))
+ .filter(metadata -> metadata.type() != MinorType.LATE)
+ .forEach(mapSchema::addColumn);
+ return MetadataUtils.newMap(name, mapSchema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index 6b0bb41819..1f278c9fbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -17,27 +17,57 @@
*/
package org.apache.drill.exec.physical.impl.values;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Values;
import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.easy.json.JSONRecordReader;
+import org.apache.drill.exec.store.easy.json.JsonStreamBatchReader;
public class ValuesBatchCreator implements BatchCreator<Values> {
+
@Override
- public ScanBatch getBatch(ExecutorFragmentContext context, Values config, List<RecordBatch> children)
- throws ExecutionSetupException {
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ Values subScan, List<RecordBatch> children) throws ExecutionSetupException {
assert children.isEmpty();
+ try {
+ ManagedScanFramework.ScanFrameworkBuilder builder = createBuilder(subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ManagedScanFramework.ScanFrameworkBuilder createBuilder(Values subScan) {
+ ManagedScanFramework.ScanFrameworkBuilder builder = new ManagedScanFramework.ScanFrameworkBuilder();
+ builder.setUserName(subScan.getUserName());
+ builder.providedSchema(subScan.getSchema());
+
+ ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(
+ getRecordReader(new ByteArrayInputStream(subScan.getContent().getBytes()))).iterator());
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
+ }
- JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(),
- null, Collections.singletonList(SchemaPath.STAR_COLUMN));
- return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
+ private static ManagedReader<SchemaNegotiator> getRecordReader(InputStream source) {
+ return new JsonStreamBatchReader(source);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillValuesRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillValuesRelBase.java
index 933dc357ce..5042c432fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillValuesRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillValuesRelBase.java
@@ -21,6 +21,7 @@ import static org.apache.drill.exec.planner.logical.DrillOptiq.isLiteralNull;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@@ -32,16 +33,14 @@ import org.apache.calcite.rel.core.Values;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.util.NlsString;
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.util.GuavaUtils;
-import org.apache.drill.exec.vector.complex.fn.ExtendedJsonOutput;
+import org.apache.drill.exec.vector.complex.fn.BasicJsonOutput;
import org.apache.drill.exec.vector.complex.fn.JsonOutput;
import org.joda.time.DateTime;
import org.joda.time.DateTimeConstants;
import org.joda.time.Period;
-import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.TokenBuffer;
@@ -53,7 +52,7 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
private static final ObjectMapper MAPPER = new ObjectMapper();
- protected final JSONOptions content;
+ protected final String content;
public DrillValuesRelBase(RelOptCluster cluster, RelDataType rowType, List<? extends List<RexLiteral>> tuples, RelTraitSet traits) {
this(cluster, rowType, tuples, traits, convertToJsonOptions(rowType, tuples));
@@ -67,15 +66,15 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
RelDataType rowType,
List<? extends List<RexLiteral>> tuples,
RelTraitSet traits,
- JSONOptions content) {
+ String content) {
super(cluster, rowType, GuavaUtils.convertToNestedUnshadedImmutableList(tuples), traits);
this.content = content;
}
/**
- * @return values content represented as json
+ * @return values content represented as json string
*/
- public JSONOptions getContent() {
+ public String getContent() {
return content;
}
@@ -87,9 +86,9 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
* @param tuples list of constant values in a row-expression
* @return json representation of tuples
*/
- private static JSONOptions convertToJsonOptions(RelDataType rowType, List<? extends List<RexLiteral>> tuples) {
+ private static String convertToJsonOptions(RelDataType rowType, List<? extends List<RexLiteral>> tuples) {
try {
- return new JSONOptions(convertToJsonNode(rowType, tuples), JsonLocation.NA);
+ return MAPPER.writeValueAsString(convertToJsonNode(rowType, tuples));
} catch (IOException e) {
throw new DrillRuntimeException("Failure while attempting to encode Values in JSON.", e);
}
@@ -97,7 +96,7 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
private static JsonNode convertToJsonNode(RelDataType rowType, List<? extends List<RexLiteral>> tuples) throws IOException {
TokenBuffer out = new TokenBuffer(MAPPER.getFactory().getCodec(), false);
- JsonOutput json = new ExtendedJsonOutput(out);
+ JsonOutput json = new BasicJsonOutput(out);
json.writeStartArray();
String[] fields = rowType.getFieldNames().toArray(new String[rowType.getFieldCount()]);
@@ -122,7 +121,7 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
if (isLiteralNull(literal)) {
out.writeBigIntNull();
} else {
- out.writeBigInt((((BigDecimal) literal.getValue()).setScale(0, BigDecimal.ROUND_HALF_UP)).longValue());
+ out.writeBigInt((((BigDecimal) literal.getValue()).setScale(0, RoundingMode.HALF_UP)).longValue());
}
return;
@@ -167,7 +166,7 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
if (isLiteralNull(literal)) {
out.writeIntNull();
} else {
- out.writeInt((((BigDecimal) literal.getValue()).setScale(0, BigDecimal.ROUND_HALF_UP)).intValue());
+ out.writeInt((((BigDecimal) literal.getValue()).setScale(0, RoundingMode.HALF_UP)).intValue());
}
return;
@@ -246,9 +245,9 @@ public abstract class DrillValuesRelBase extends Values implements DrillRelNode
out.writeIntervalNull();
} else {
long millis = ((BigDecimal) (literal.getValue())).longValue();
- int days = (int) (millis / DateTimeConstants.MILLIS_PER_DAY);
+ long days = millis / DateTimeConstants.MILLIS_PER_DAY;
millis = millis - (days * DateTimeConstants.MILLIS_PER_DAY);
- out.writeInterval(new Period().plusDays(days).plusMillis((int) millis));
+ out.writeInterval(new Period().plusDays((int) days).plusMillis((int) millis));
}
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
index 492e6a39fe..fc444e6a17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.planner.logical;
import java.util.List;
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
@@ -39,7 +38,8 @@ public class DrillValuesRel extends DrillValuesRelBase implements DrillRel {
super(cluster, rowType, tuples, traits);
}
- public DrillValuesRel(RelOptCluster cluster, RelDataType rowType, List<? extends List<RexLiteral>> tuples, RelTraitSet traits, JSONOptions content) {
+ public DrillValuesRel(RelOptCluster cluster, RelDataType rowType, List<? extends List<RexLiteral>> tuples,
+ RelTraitSet traits, String content) {
super(cluster, rowType, tuples, traits, content);
}
@@ -52,7 +52,7 @@ public class DrillValuesRel extends DrillValuesRelBase implements DrillRel {
@Override
public LogicalOperator implement(DrillImplementor implementor) {
return Values.builder()
- .content(content.asNode())
+ .content(content)
.build();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ValuesPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ValuesPrel.java
index e464b1ad22..daed80c589 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ValuesPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ValuesPrel.java
@@ -27,26 +27,30 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
-import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Values;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.planner.common.DrillValuesRelBase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
/**
* Physical Values implementation in Drill.
*/
public class ValuesPrel extends DrillValuesRelBase implements Prel {
- public ValuesPrel(RelOptCluster cluster, RelDataType rowType, List<? extends List<RexLiteral>> tuples, RelTraitSet traits) {
+ public ValuesPrel(RelOptCluster cluster, RelDataType rowType,
+ List<? extends List<RexLiteral>> tuples, RelTraitSet traits) {
super(cluster, rowType, tuples, traits);
}
public ValuesPrel(RelOptCluster cluster,
RelDataType rowType,
List<? extends List<RexLiteral>> tuples, RelTraitSet traits,
- JSONOptions content) {
+ String content) {
super(cluster, rowType, tuples, traits, content);
}
@@ -57,7 +61,17 @@ public class ValuesPrel extends DrillValuesRelBase implements Prel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- return creator.addMetadata(this, new Values(content));
+ return creator.addMetadata(this, new Values(content, getSchema()));
+ }
+
+ private TupleMetadata getSchema() {
+ TupleSchema columnMetadata = new TupleSchema();
+ getRowType().getFieldList().stream()
+ .map(field -> SchemaUtils.getColumnMetadata(field.getName(), field.getType()))
+ .filter(metadata -> metadata.type() != TypeProtos.MinorType.LATE)
+ .forEach(columnMetadata::add);
+
+ return columnMetadata;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
index 4de3f81d9c..07548ae6c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
@@ -350,7 +350,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
RelNode analyzeRel = useStatistics
? new DrillAnalyzeRel(
convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertToRawDrel(relNode), samplePercent)
- : convertToRawDrel(relBuilder.values(new String[]{""}, "").build());
+ : convertToRawDrel(relBuilder.values(convertedRelNode.getRowType()).build());
MetadataControllerContext metadataControllerContext = MetadataControllerContext.builder()
.tableInfo(tableInfo)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStreamBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStreamBatchReader.java
new file mode 100644
index 0000000000..9e0047ae34
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStreamBatchReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
+
+import java.io.InputStream;
+
+/**
+ * EVF based JSON reader which uses input stream as data source.
+ */
+public class JsonStreamBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private JsonLoader jsonLoader;
+ private final InputStream source;
+
+ public JsonStreamBatchReader(InputStream source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+
+ CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext());
+ negotiator.setErrorContext(errorContext);
+
+ // Create the JSON loader (high-level parser).
+ jsonLoader = new JsonLoaderBuilder()
+ .resultSetLoader(negotiator.build())
+ .standardOptions(negotiator.queryOptions())
+ .providedSchema(negotiator.providedSchema())
+ .errorContext(errorContext)
+ .fromStream(source)
+ .build();
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ return jsonLoader.readBatch();
+ }
+
+ @Override
+ public void close() {
+ if (jsonLoader != null) {
+ jsonLoader.close();
+ jsonLoader = null;
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectWithFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectWithFunctions.java
index 4f91a91032..690a75b731 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectWithFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectWithFunctions.java
@@ -77,7 +77,7 @@ public class TestProjectWithFunctions extends ClusterTest {
.sqlQuery(sql)
.unOrdered()
.baselineColumns("c", "d")
- .baselineValues(2L, 1L)
+ .baselineValues(2, 1)
.go();
}
} finally {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestValues.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestValues.java
index 0a301ef093..3157dcd7e5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestValues.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestValues.java
@@ -61,8 +61,8 @@ public class TestValues extends ClusterTest {
.sqlQuery("select id, name from (values(1, 'A'), (2, 'B')) t(id, name)")
.unOrdered()
.baselineColumns("id", "name")
- .baselineValues(1L, "A")
- .baselineValues(2L, "B")
+ .baselineValues(1, "A")
+ .baselineValues(2, "B")
.go();
} finally {
client.resetSession(ExecConstants.SLICE_TARGET);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Values.java b/logical/src/main/java/org/apache/drill/common/logical/data/Values.java
index a62bcfc615..25b760272a 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/Values.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Values.java
@@ -17,29 +17,26 @@
*/
package org.apache.drill.common.logical.data;
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.core.JsonLocation;
-import com.fasterxml.jackson.databind.JsonNode;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@JsonTypeName("values")
public class Values extends SourceOperator {
- private final JSONOptions content;
+ private final String content;
@JsonCreator
- public Values(@JsonProperty("content") JSONOptions content){
+ public Values(@JsonProperty("content") String content){
super();
this.content = content;
Preconditions.checkNotNull(content, "content attribute is required for source operator 'constant'.");
}
- public JSONOptions getContent() {
+ public String getContent() {
return content;
}
@@ -53,10 +50,10 @@ public class Values extends SourceOperator {
}
public static class Builder extends AbstractBuilder<Values>{
- private JSONOptions content;
+ private String content;
- public Builder content(JsonNode n){
- content = new JSONOptions(n, JsonLocation.NA);
+ public Builder content(String content) {
+ this.content = content;
return this;
}