You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/03/03 05:00:31 UTC
[drill] branch master updated: DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1597779 DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)
1597779 is described below
commit 1597779464c988edd6bffeef08c2542173207aa4
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Mar 3 07:00:23 2022 +0200
DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)
---
contrib/storage-cassandra/pom.xml | 2 +-
.../cassandra/CassandraColumnConverterFactory.java | 144 +++++++++++++++++++++
.../CassandraColumnConverterFactoryProvider.java | 25 ++--
.../plan/CassandraEnumerablePrelContext.java | 7 +
.../exec/store/cassandra/BaseCassandraTest.java | 8 +-
.../exec/store/cassandra/CassandraQueryTest.java | 91 +++++++++----
...tCassandraSuit.java => TestCassandraSuite.java} | 6 +-
.../src/test/resources/queries.cql | 18 ++-
contrib/storage-elasticsearch/pom.xml | 52 +++-----
.../ElasticsearchColumnConverterFactory.java | 65 ++++++++++
...lasticsearchColumnConverterFactoryProvider.java | 25 ++--
.../plan/ElasticSearchEnumerablePrelContext.java | 7 +
.../elasticsearch/ElasticComplexTypesTest.java | 9 +-
.../store/elasticsearch/ElasticInfoSchemaTest.java | 8 +-
.../store/elasticsearch/ElasticSearchPlanTest.java | 8 +-
.../elasticsearch/ElasticSearchQueryTest.java | 77 +++++++----
.../elasticsearch/TestElasticsearchSuite.java} | 46 ++++---
.../apache/drill/exec/record/ColumnConverter.java | 2 +-
...xt.java => ColumnConverterFactoryProvider.java} | 22 +---
... => DefaultColumnConverterFactoryProvider.java} | 24 ++--
.../store/enumerable/EnumerableBatchCreator.java | 2 +-
.../exec/store/enumerable/EnumerableGroupScan.java | 13 +-
.../store/enumerable/EnumerableRecordReader.java | 8 +-
.../exec/store/enumerable/EnumerableSubScan.java | 9 +-
.../exec/store/enumerable/plan/EnumerablePrel.java | 7 +-
.../enumerable/plan/EnumerablePrelContext.java | 6 +
26 files changed, 496 insertions(+), 195 deletions(-)
diff --git a/contrib/storage-cassandra/pom.xml b/contrib/storage-cassandra/pom.xml
index 5652b94..5fa0273 100644
--- a/contrib/storage-cassandra/pom.xml
+++ b/contrib/storage-cassandra/pom.xml
@@ -107,7 +107,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
- <include>**/TestCassandraSuit.class</include>
+ <include>**/TestCassandraSuite.class</include>
</includes>
<excludes>
<exclude>**/CassandraComplexTypesTest.java</exclude>
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java
new file mode 100644
index 0000000..861b83f
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.datastax.driver.core.Duration;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.Inet4Address;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class CassandraColumnConverterFactory extends ColumnConverterFactory {
+
+ private static final PeriodFormatter FORMATTER = new PeriodFormatterBuilder()
+ .appendYears()
+ .appendSuffix("Y")
+ .appendMonths()
+ .appendSuffix("M")
+ .appendWeeks()
+ .appendSuffix("W")
+ .appendDays()
+ .appendSuffix("D")
+ .appendHours()
+ .appendSuffix("H")
+ .appendMinutes()
+ .appendSuffix("M")
+ .appendSecondsWithOptionalMillis()
+ .appendSuffix("S")
+ .toFormatter();
+
+ public CassandraColumnConverterFactory(TupleMetadata providedSchema) {
+ super(providedSchema);
+ }
+
+ @Override
+ public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) {
+ switch (readerSchema.type()) {
+ case INTERVAL:
+ return new ColumnConverter.ScalarColumnConverter(value -> {
+ Duration duration = (Duration) value;
+ writer.setPeriod(Period.parse(duration.toString(), FORMATTER));
+ });
+ case BIGINT:
+ return new ColumnConverter.ScalarColumnConverter(value -> {
+ long longValue;
+ if (value instanceof BigInteger) {
+ longValue = ((BigInteger) value).longValue();
+ } else {
+ longValue = (Long) value;
+ }
+ writer.setLong(longValue);
+ });
+ case VARCHAR:
+ return new ColumnConverter.ScalarColumnConverter(value -> writer.setString(value.toString()));
+ case VARDECIMAL:
+ return new ColumnConverter.ScalarColumnConverter(value -> writer.setDecimal((BigDecimal) value));
+ case VARBINARY:
+ return new ColumnConverter.ScalarColumnConverter(value -> {
+ byte[] bytes;
+ if (value instanceof Inet4Address) {
+ bytes = ((Inet4Address) value).getAddress();
+ } else if (value instanceof UUID) {
+ UUID uuid = (UUID) value;
+ bytes = ByteBuffer.wrap(new byte[16])
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.getMostSignificantBits())
+ .putLong(uuid.getLeastSignificantBits())
+ .array();
+ } else {
+ bytes = (byte[]) value;
+ }
+ writer.setBytes(bytes, bytes.length);
+ });
+ case BIT:
+ return new ColumnConverter.ScalarColumnConverter(value -> writer.setBoolean((Boolean) value));
+ default:
+ return super.buildScalar(readerSchema, writer);
+ }
+ }
+
+ @Override
+ protected ColumnConverter getMapConverter(TupleMetadata providedSchema,
+ TupleMetadata readerSchema, TupleWriter tupleWriter) {
+ Map<String, ColumnConverter> converters = StreamSupport.stream(readerSchema.spliterator(), false)
+ .collect(Collectors.toMap(
+ ColumnMetadata::name,
+ columnMetadata ->
+ getConverter(providedSchema, columnMetadata, tupleWriter.column(columnMetadata.name()))));
+
+ return new CassandraMapColumnConverter(this, providedSchema, tupleWriter, converters);
+ }
+
+ private static class CassandraMapColumnConverter extends ColumnConverter.MapColumnConverter {
+
+ public CassandraMapColumnConverter(ColumnConverterFactory factory, TupleMetadata providedSchema, TupleWriter tupleWriter, Map<String, ColumnConverter> converters) {
+ super(factory, providedSchema, tupleWriter, converters);
+ }
+
+ @Override
+ protected TypeProtos.MinorType getScalarMinorType(Class<?> clazz) {
+ if (clazz == Duration.class) {
+ return TypeProtos.MinorType.INTERVAL;
+ } else if (clazz == Inet4Address.class
+ || clazz == UUID.class) {
+ return TypeProtos.MinorType.VARBINARY;
+ } else if (clazz == java.math.BigInteger.class) {
+ return TypeProtos.MinorType.BIGINT;
+ } else if (clazz == org.apache.calcite.avatica.util.ByteString.class) {
+ return TypeProtos.MinorType.VARCHAR;
+ }
+ return super.getScalarMinorType(clazz);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
similarity index 57%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
index 83f8f72..54254eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
@@ -15,22 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.cassandra;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
-import java.util.Map;
+public class CassandraColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+ public static final ColumnConverterFactoryProvider INSTANCE = new CassandraColumnConverterFactoryProvider();
-public interface EnumerablePrelContext {
-
- String generateCode(RelOptCluster cluster, RelNode relNode);
-
- RelNode transformNode(RelNode input);
-
- Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
- String getPlanPrefix();
-
- String getTablePath(RelNode input);
+ @Override
+ public ColumnConverterFactory getFactory(TupleMetadata schema) {
+ return new CassandraColumnConverterFactory(schema);
+ }
}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
index 06cf181..91e5608 100644
--- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.cassandra.CassandraColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
import java.util.Collections;
@@ -78,4 +80,9 @@ public class CassandraEnumerablePrelContext implements EnumerablePrelContext {
List<String> qualifiedName = scan.getTable().getQualifiedName();
return String.join(".", qualifiedName.subList(0, qualifiedName.size() - 1));
}
+
+ @Override
+ public ColumnConverterFactoryProvider factoryProvider() {
+ return CassandraColumnConverterFactoryProvider.INSTANCE;
+ }
}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
index a8eb40f..faae847 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -27,8 +27,8 @@ public class BaseCassandraTest extends ClusterTest {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TestCassandraSuit.initCassandra();
- initCassandraPlugin(TestCassandraSuit.cassandra);
+ TestCassandraSuite.initCassandra();
+ initCassandraPlugin(TestCassandraSuite.cassandra);
}
private static void initCassandraPlugin(CassandraContainer<?> cassandra) throws Exception {
@@ -46,8 +46,8 @@ public class BaseCassandraTest extends ClusterTest {
@AfterClass
public static void tearDownCassandra() {
- if (TestCassandraSuit.isRunningSuite()) {
- TestCassandraSuit.tearDownCluster();
+ if (TestCassandraSuite.isRunningSuite()) {
+ TestCassandraSuite.tearDownCluster();
}
}
}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
index e7aecaa..536a102 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
@@ -18,10 +18,15 @@
package org.apache.drill.exec.store.cassandra;
import org.apache.drill.common.exceptions.UserRemoteException;
+import org.joda.time.Period;
import org.junit.Test;
import java.math.BigDecimal;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.time.LocalDate;
+import java.util.UUID;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -32,21 +37,24 @@ public class CassandraQueryTest extends BaseCassandraTest {
@Test
public void testSelectAll() throws Exception {
testBuilder()
- .sqlQuery("select * from cassandra.test_keyspace.`employee`")
- .unOrdered()
+ .sqlQuery("select * from cassandra.test_keyspace.`employee` order by employee_id")
+ .ordered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
- .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
- .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management")
- .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management")
- .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors Degree", "S", "F", "Store Management")
- .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5, "Graduate Degree", "M", "F", "Store Management")
- .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial College", "M", "M", "Senior Management")
- .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5, "Graduate Degree", "S", "M", "Store Management")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+ "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+ "uuid_field", "varchar_field", "varint_field")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors Degree", "S", "F", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5, "Graduate Degree", "M", "F", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial College", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5, "Graduate Degree", "S", "M", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.go();
}
@@ -76,9 +84,18 @@ public class CassandraQueryTest extends BaseCassandraTest {
.unOrdered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+ "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+ "uuid_field", "varchar_field", "varint_field")
.baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26",
- "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
+ "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management",
+ "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123,
+ new Period(0, 0, 0, 3, 0, 0, 0, 320688000),
+ InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L,
+ getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+ getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+ "abc", 123L)
.go();
}
@@ -144,10 +161,13 @@ public class CassandraQueryTest extends BaseCassandraTest {
.ordered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
- .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
- .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management")
- .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+ "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+ "uuid_field", "varchar_field", "varint_field")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.go();
}
@@ -275,10 +295,13 @@ public class CassandraQueryTest extends BaseCassandraTest {
.baselineColumns("full_name")
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role", "full_name0")
- .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", 123)
- .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", 123)
- .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", 123)
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+ "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+ "uuid_field", "varchar_field", "varint_field", "full_name0")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null, 123)
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null, 123)
.go();
}
@@ -327,9 +350,27 @@ public class CassandraQueryTest extends BaseCassandraTest {
.ordered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+ "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+ "uuid_field", "varchar_field", "varint_field")
.baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, LocalDate.parse("1961-08-26"),
- "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management")
+ "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management",
+ "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123,
+ new Period(0, 0, 0, 3, 0, 0, 0, 320688000),
+ InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L,
+ getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+ getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+ "abc", 123L)
.go();
}
+
+ private static byte[] getUuidBytes(String name) {
+ UUID uuid = UUID.fromString(name);
+ return ByteBuffer.wrap(new byte[16])
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.getMostSignificantBits())
+ .putLong(uuid.getLeastSignificantBits())
+ .array();
+ }
}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
similarity index 94%
copy from contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
copy to contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
index 9009ba7..98ead70 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
@@ -32,7 +32,7 @@ import org.testcontainers.containers.CassandraContainer;
@Category(SlowTest.class)
@RunWith(Suite.class)
@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class})
-public class TestCassandraSuit extends BaseTest {
+public class TestCassandraSuite extends BaseTest {
protected static CassandraContainer<?> cassandra;
@@ -42,7 +42,7 @@ public class TestCassandraSuit extends BaseTest {
@BeforeClass
public static void initCassandra() {
- synchronized (TestCassandraSuit.class) {
+ synchronized (TestCassandraSuite.class) {
if (initCount.get() == 0) {
startCassandra();
}
@@ -57,7 +57,7 @@ public class TestCassandraSuit extends BaseTest {
@AfterClass
public static void tearDownCluster() {
- synchronized (TestCassandraSuit.class) {
+ synchronized (TestCassandraSuite.class) {
if (initCount.decrementAndGet() == 0 && cassandra != null) {
cassandra.stop();
}
diff --git a/contrib/storage-cassandra/src/test/resources/queries.cql b/contrib/storage-cassandra/src/test/resources/queries.cql
index 527d88c..6382c53 100644
--- a/contrib/storage-cassandra/src/test/resources/queries.cql
+++ b/contrib/storage-cassandra/src/test/resources/queries.cql
@@ -35,13 +35,27 @@ CREATE TABLE test_keyspace.employee (
marital_status text,
gender text,
management_role text,
+ ascii_field ascii,
+ blob_field blob,
+ boolean_field boolean,
+ date_field date,
+ decimal_field decimal,
+ double_field double,
+ duration_field duration,
+ inet_field inet,
+ time_field time,
+ timestamp_field timestamp,
+ timeuuid_field timeuuid,
+ uuid_field uuid,
+ varchar_field varchar,
+ varint_field varint,
PRIMARY KEY (full_name, employee_id)
) WITH CLUSTERING ORDER BY (employee_id ASC);
USE test_keyspace;
-INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
- VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1, '1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role, ascii_field, blob_field, boolean_field, date_field, decimal_field, double_field, duration_field, inet_field, time_field, timestamp_field, timeuuid_field, uuid_field, varchar_field, varint_field)
+ VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1, '1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F', 'Senior Management', 'abc', bigintAsBlob(3), true, '2011-02-03', 123.456, 321.123, 3d89h4m48s, '8.8.8.8', '04:05:00', '2011-02-03 04:05:00', 50554d6e-29bb-11e5-b345-feff819cdc9f, 50554d6e-29bb-11e5-b345-feff819cdc9f, 'abc', 123);
INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
VALUES (2, 'Derrick Whelply', 'Derrick', 'Whelply', 2, 'VP Country Manager',0,1, '1915-07-03', '1994-12-01 00:00:00.0',40000.0 ,1, 'Graduate Degree', 'M', 'M', 'Senior Management');
INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
diff --git a/contrib/storage-elasticsearch/pom.xml b/contrib/storage-elasticsearch/pom.xml
index 40757b7..d645646 100644
--- a/contrib/storage-elasticsearch/pom.xml
+++ b/contrib/storage-elasticsearch/pom.xml
@@ -20,9 +20,6 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <properties>
- <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
- </properties>
<parent>
<artifactId>drill-contrib-parent</artifactId>
<groupId>org.apache.drill.contrib</groupId>
@@ -80,6 +77,18 @@
<version>0.4</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
@@ -88,36 +97,17 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount combine.self="override">1</forkCount>
+ <includes>
+ <include>**/TestElasticsearchSuite.class</include>
+ </includes>
+ <excludes>
+ <exclude>**/ElasticComplexTypesTest.java</exclude>
+ <exclude>**/ElasticInfoSchemaTest.java</exclude>
+ <exclude>**/ElasticSearchPlanTest.java</exclude>
+ <exclude>**/ElasticSearchQueryTest.java</exclude>
+ </excludes>
</configuration>
</plugin>
- <plugin>
- <groupId>com.github.alexcojocaru</groupId>
- <artifactId>elasticsearch-maven-plugin</artifactId>
- <version>6.19</version>
- <configuration>
- <version>${test.elasticsearch.version}</version>
- <clusterName>test</clusterName>
- <transportPort>9300</transportPort>
- <httpPort>9200</httpPort>
- <skip>${skipTests}</skip>
- </configuration>
- <executions>
- <execution>
- <id>start-elasticsearch</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>runforked</goal>
- </goals>
- </execution>
- <execution>
- <id>stop-elasticsearch</id>
- <phase>post-integration-test</phase>
- <goals>
- <goal>stop</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java
new file mode 100644
index 0000000..9e2ee71
--- /dev/null
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class ElasticsearchColumnConverterFactory extends ColumnConverterFactory {
+
+ public ElasticsearchColumnConverterFactory(TupleMetadata providedSchema) {
+ super(providedSchema);
+ }
+
+ @Override
+ public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) {
+ switch (readerSchema.type()) {
+ case BIT:
+ return new ColumnConverter.ScalarColumnConverter(value -> writer.setBoolean((Boolean) value));
+ default:
+ return super.buildScalar(readerSchema, writer);
+ }
+ }
+
+ @Override
+ protected ColumnConverter getMapConverter(TupleMetadata providedSchema,
+ TupleMetadata readerSchema, TupleWriter tupleWriter) {
+ Map<String, ColumnConverter> converters = StreamSupport.stream(readerSchema.spliterator(), false)
+ .collect(Collectors.toMap(
+ ColumnMetadata::name,
+ columnMetadata ->
+ getConverter(providedSchema, columnMetadata, tupleWriter.column(columnMetadata.name()))));
+
+ return new ElasticMapColumnConverter(this, providedSchema, tupleWriter, converters);
+ }
+
+ private static class ElasticMapColumnConverter extends ColumnConverter.MapColumnConverter {
+
+ public ElasticMapColumnConverter(ColumnConverterFactory factory, TupleMetadata providedSchema, TupleWriter tupleWriter, Map<String, ColumnConverter> converters) {
+ super(factory, providedSchema, tupleWriter, converters);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
similarity index 57%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
index 83f8f72..40cce4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
@@ -15,22 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.elasticsearch;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
-import java.util.Map;
+public class ElasticsearchColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+ public static final ColumnConverterFactoryProvider INSTANCE = new ElasticsearchColumnConverterFactoryProvider();
-public interface EnumerablePrelContext {
-
- String generateCode(RelOptCluster cluster, RelNode relNode);
-
- RelNode transformNode(RelNode input);
-
- Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
- String getPlanPrefix();
-
- String getTablePath(RelNode input);
+ @Override
+ public ColumnConverterFactory getFactory(TupleMetadata schema) {
+ return new ElasticsearchColumnConverterFactory(schema);
+ }
}
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
index 29db1ca..e1a92bb 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
@@ -27,6 +27,8 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.elasticsearch.ElasticsearchColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
import java.util.Collections;
@@ -76,4 +78,9 @@ public class ElasticSearchEnumerablePrelContext implements EnumerablePrelContext
public String getTablePath(RelNode input) {
return null;
}
+
+ @Override
+ public ColumnConverterFactoryProvider factoryProvider() {
+ return ElasticsearchColumnConverterFactoryProvider.INSTANCE;
+ }
}
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
index 43d4681..9f4f52d 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
@@ -46,18 +46,17 @@ import static org.apache.drill.test.TestBuilder.mapOf;
public class ElasticComplexTypesTest extends ClusterTest {
- private static final String HOST = "http://localhost:9200";
-
private static final List<String> indexNames = new ArrayList<>();
public static RestHighLevelClient restHighLevelClient;
@BeforeClass
public static void init() throws Exception {
+ TestElasticsearchSuite.initElasticsearch();
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
@@ -69,10 +68,12 @@ public class ElasticComplexTypesTest extends ClusterTest {
for (String indexName : indexNames) {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
}
+ TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+ restHighLevelClient = new RestHighLevelClient(
+ RestClient.builder(HttpHost.create(TestElasticsearchSuite.elasticsearch.getHttpHostAddress())));
String indexName = "arr";
indexNames.add(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
index cad11bb..aa4dae4 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
@@ -41,18 +41,17 @@ import java.util.List;
public class ElasticInfoSchemaTest extends ClusterTest {
- private static final String HOST = "http://localhost:9200";
-
private static final List<String> indexNames = new ArrayList<>();
public static RestHighLevelClient restHighLevelClient;
@BeforeClass
public static void init() throws Exception {
+ TestElasticsearchSuite.initElasticsearch();
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
@@ -64,10 +63,11 @@ public class ElasticInfoSchemaTest extends ClusterTest {
for (String indexName : indexNames) {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
}
+ TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+ restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
String indexName = "t1";
indexNames.add(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
index 74773af..db81edf 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
@@ -39,18 +39,17 @@ import java.util.Collections;
public class ElasticSearchPlanTest extends ClusterTest {
- private static final String HOST = "http://localhost:9200";
-
public static RestHighLevelClient restHighLevelClient;
private static String indexName;
@BeforeClass
public static void init() throws Exception {
+ TestElasticsearchSuite.initElasticsearch();
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
@@ -60,10 +59,11 @@ public class ElasticSearchPlanTest extends ClusterTest {
@AfterClass
public static void cleanUp() throws IOException {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+ TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+ restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
indexName = "nation";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
index 79f9344..374c449 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
+import java.util.Base64;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.containsString;
@@ -46,18 +47,17 @@ import static org.junit.Assert.fail;
public class ElasticSearchQueryTest extends ClusterTest {
- private static final String HOST = "http://localhost:9200";
-
public static RestHighLevelClient restHighLevelClient;
private static String indexName;
@BeforeClass
public static void init() throws Exception {
+ TestElasticsearchSuite.initElasticsearch();
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
@@ -67,10 +67,11 @@ public class ElasticSearchQueryTest extends ClusterTest {
@AfterClass
public static void cleanUp() throws IOException {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+ TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+ restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
indexName = "employee";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
@@ -95,6 +96,14 @@ public class ElasticSearchQueryTest extends ClusterTest {
builder.field("marital_status", "S");
builder.field("gender", "F");
builder.field("management_role", "Senior Management");
+ builder.field("binary_field", "Senior Management".getBytes());
+ builder.field("boolean_field", true);
+ builder.timeField("date_field", "2015/01/01 12:10:30");
+ builder.field("byte_field", (byte) 123);
+ builder.field("long_field", 123L);
+ builder.field("float_field", 123F);
+ builder.field("short_field", (short) 123);
+ builder.field("decimal_field", new BigDecimal("123.45"));
builder.endObject();
IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
@@ -307,17 +316,19 @@ public class ElasticSearchQueryTest extends ClusterTest {
.unOrdered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
- .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
- .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management")
- .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management")
- .baselineValues(5, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(6, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(7, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
- .baselineValues(8, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0, 5, "Bachelors Degree", "S", "F", "Store Management")
- .baselineValues(9, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0, 5, "Graduate Degree", "M", "F", "Store Management")
- .baselineValues(10, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0, 1, "Partial College", "M", "M", "Senior Management")
- .baselineValues(11, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0, 5, "Graduate Degree", "S", "M", "Store Management")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+ "short_field", "decimal_field")
+ .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
+ .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(5, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(6, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(7, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(8, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0, 5, "Bachelors Degree", "S", "F", "Store Management", null, null, null, null, null, null, null, null)
+ .baselineValues(9, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0, 5, "Graduate Degree", "M", "F", "Store Management", null, null, null, null, null, null, null, null)
+ .baselineValues(10, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0, 1, "Partial College", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(11, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0, 5, "Graduate Degree", "S", "M", "Store Management", null, null, null, null, null, null, null, null)
.go();
}
@@ -347,9 +358,13 @@ public class ElasticSearchQueryTest extends ClusterTest {
.unOrdered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+ "short_field", "decimal_field")
.baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26",
- "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
+ "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management",
+ Base64.getEncoder().encodeToString("Senior Management".getBytes()), true,
+ "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
.go();
}
@@ -415,10 +430,12 @@ public class ElasticSearchQueryTest extends ClusterTest {
.ordered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
- .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
- .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management")
- .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+ "short_field", "decimal_field")
+ .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
+ .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+ .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null)
.go();
}
@@ -538,10 +555,12 @@ public class ElasticSearchQueryTest extends ClusterTest {
.baselineColumns("full_name")
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role", "full_name0")
- .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", 123)
- .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", 123)
- .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", 123)
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+ "short_field", "decimal_field", "full_name0")
+ .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45, 123)
+ .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, 123)
+ .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, 123)
.go();
}
@@ -590,9 +609,13 @@ public class ElasticSearchQueryTest extends ClusterTest {
.ordered()
.baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
"position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
- "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+ "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+ "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+ "short_field", "decimal_field")
.baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, LocalDate.parse("1961-08-26"),
- "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management")
+ "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management",
+ Base64.getEncoder().encodeToString("Senior Management".getBytes()), true,
+ "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
.go();
}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
similarity index 57%
rename from contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
rename to contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
index 9009ba7..dec9b76 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
@@ -15,10 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.cassandra;
-
-import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
+package org.apache.drill.exec.store.elasticsearch;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.test.BaseTest;
@@ -27,24 +24,28 @@ import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
-import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
@Category(SlowTest.class)
@RunWith(Suite.class)
-@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class})
-public class TestCassandraSuit extends BaseTest {
+@Suite.SuiteClasses({ElasticComplexTypesTest.class, ElasticInfoSchemaTest.class, ElasticSearchPlanTest.class, ElasticSearchQueryTest.class})
+public class TestElasticsearchSuite extends BaseTest {
- protected static CassandraContainer<?> cassandra;
+ protected static ElasticsearchContainer elasticsearch;
private static final AtomicInteger initCount = new AtomicInteger(0);
private static volatile boolean runningSuite = false;
@BeforeClass
- public static void initCassandra() {
- synchronized (TestCassandraSuit.class) {
+ public static void initElasticsearch() {
+ synchronized (TestElasticsearchSuite.class) {
if (initCount.get() == 0) {
- startCassandra();
+ startElasticsearch();
}
initCount.incrementAndGet();
runningSuite = true;
@@ -57,19 +58,22 @@ public class TestCassandraSuit extends BaseTest {
@AfterClass
public static void tearDownCluster() {
- synchronized (TestCassandraSuit.class) {
- if (initCount.decrementAndGet() == 0 && cassandra != null) {
- cassandra.stop();
+ synchronized (TestElasticsearchSuite.class) {
+ if (initCount.decrementAndGet() == 0 && elasticsearch != null) {
+ elasticsearch.stop();
}
}
}
- private static void startCassandra() {
- cassandra = new CassandraContainer<>("cassandra")
- .withInitScript("queries.cql")
- .withStartupTimeout(Duration.ofMinutes(2))
- .withEnv("CASSANDRA_SNITCH", "GossipingPropertyFileSnitch") // Tune Cassandra options for faster startup
- .withEnv("JVM_OPTS", "-Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.initial_token=0");
- cassandra.start();
+ private static void startElasticsearch() {
+ DockerImageName imageName = DockerImageName.parse("elasticsearch:7.14.2")
+ .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ TestElasticsearchSuite.elasticsearch = new ElasticsearchContainer(imageName)
+ .withStartupTimeout(Duration.ofMinutes(2));
+ TestElasticsearchSuite.elasticsearch.start();
+ }
+
+ public static String getAddress() {
+ return elasticsearch.getHttpHostAddress();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
index d81db76..e8cad3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
@@ -219,7 +219,7 @@ public interface ColumnConverter {
}
}
- private MinorType getScalarMinorType(Class<?> clazz) {
+ protected MinorType getScalarMinorType(Class<?> clazz) {
if (clazz == byte.class || clazz == Byte.class) {
return MinorType.TINYINT;
} else if (clazz == short.class || clazz == Short.class) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
similarity index 64%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
index 83f8f72..28aec71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
@@ -15,22 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.enumerable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
-import java.util.Map;
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+public interface ColumnConverterFactoryProvider {
-public interface EnumerablePrelContext {
-
- String generateCode(RelOptCluster cluster, RelNode relNode);
-
- RelNode transformNode(RelNode input);
-
- Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
- String getPlanPrefix();
-
- String getTablePath(RelNode input);
+ ColumnConverterFactory getFactory(TupleMetadata schema);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
similarity index 61%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
index 83f8f72..75573ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
@@ -15,22 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.enumerable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
-import java.util.Map;
+public class DefaultColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+ public static ColumnConverterFactoryProvider INSTANCE = new DefaultColumnConverterFactoryProvider();
-public interface EnumerablePrelContext {
-
- String generateCode(RelOptCluster cluster, RelNode relNode);
-
- RelNode transformNode(RelNode input);
-
- Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
- String getPlanPrefix();
-
- String getTablePath(RelNode input);
+ @Override
+ public ColumnConverterFactory getFactory(TupleMetadata schema) {
+ return new ColumnConverterFactory(schema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
index 9c4ca54..2dec45a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
@@ -61,7 +61,7 @@ public class EnumerableBatchCreator implements BatchCreator<EnumerableSubScan> {
builder.providedSchema(subScan.getSchema());
ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
- subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath());
+ subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
builder.setReaderFactory(readerFactory);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
index 815a5c2..e74a97f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
@@ -39,6 +39,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
private final List<SchemaPath> columns;
private final double rows;
private final TupleMetadata schema;
+ private final ColumnConverterFactoryProvider converterFactoryProvider;
@JsonCreator
public EnumerableGroupScan(
@@ -47,7 +48,8 @@ public class EnumerableGroupScan extends AbstractGroupScan {
@JsonProperty("fieldsMap") Map<String, Integer> fieldsMap,
@JsonProperty("rows") double rows,
@JsonProperty("schema") TupleMetadata schema,
- @JsonProperty("schemaPath") String schemaPath) {
+ @JsonProperty("schemaPath") String schemaPath,
+ @JsonProperty("converterFactoryProvider") ColumnConverterFactoryProvider converterFactoryProvider) {
super("");
this.code = code;
this.columns = columns;
@@ -55,6 +57,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
this.rows = rows;
this.schema = schema;
this.schemaPath = schemaPath;
+ this.converterFactoryProvider = converterFactoryProvider;
}
@Override
@@ -63,7 +66,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
@Override
public SubScan getSpecificScan(int minorFragmentId) {
- return new EnumerableSubScan(code, columns, fieldsMap, schema, schemaPath);
+ return new EnumerableSubScan(code, columns, fieldsMap, schema, schemaPath, converterFactoryProvider);
}
@Override
@@ -105,6 +108,10 @@ public class EnumerableGroupScan extends AbstractGroupScan {
return schemaPath;
}
+ public ColumnConverterFactoryProvider getConverterFactoryProvider() {
+ return converterFactoryProvider;
+ }
+
@Override
public String getDigest() {
return toString();
@@ -113,7 +120,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath);
+ return new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath, converterFactoryProvider);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
index 85ea7c4..ee07923 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
@@ -66,17 +66,21 @@ public class EnumerableRecordReader implements ManagedReader<SchemaNegotiator> {
private final String schemaPath;
+ private final ColumnConverterFactoryProvider factoryProvider;
+
private ColumnConverter converter;
private Iterator<Map<String, Object>> records;
private ResultSetLoader loader;
- public EnumerableRecordReader(List<SchemaPath> columns, Map<String, Integer> fieldsMap, String code, String schemaPath) {
+ public EnumerableRecordReader(List<SchemaPath> columns, Map<String, Integer> fieldsMap,
+ String code, String schemaPath, ColumnConverterFactoryProvider factoryProvider) {
this.columns = columns;
this.fieldsMap = fieldsMap;
this.code = code;
this.schemaPath = schemaPath;
+ this.factoryProvider = factoryProvider;
}
@SuppressWarnings("unchecked")
@@ -140,7 +144,7 @@ public class EnumerableRecordReader implements ManagedReader<SchemaNegotiator> {
TupleMetadata providedSchema = negotiator.providedSchema();
loader = negotiator.build();
setup(negotiator.context());
- ColumnConverterFactory factory = new ColumnConverterFactory(providedSchema);
+ ColumnConverterFactory factory = factoryProvider.getFactory(providedSchema);
converter = factory.getRootConverter(providedSchema, new TupleSchema(), loader.writer());
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
index 85d282b..4476be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
@@ -35,6 +35,7 @@ public class EnumerableSubScan extends AbstractSubScan {
private final List<SchemaPath> columns;
private final Map<String, Integer> fieldsMap;
private final TupleMetadata schema;
+ private final ColumnConverterFactoryProvider converterFactoryProvider;
@JsonCreator
public EnumerableSubScan(
@@ -42,13 +43,15 @@ public class EnumerableSubScan extends AbstractSubScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("fieldsMap") Map<String, Integer> fieldsMap,
@JsonProperty("schema") TupleMetadata schema,
- @JsonProperty("schemaPath") String schemaPath) {
+ @JsonProperty("schemaPath") String schemaPath,
+ @JsonProperty("converterFactoryProvider") ColumnConverterFactoryProvider converterFactoryProvider) {
super("");
this.code = code;
this.columns = columns;
this.fieldsMap = fieldsMap;
this.schema = schema;
this.schemaPath = schemaPath;
+ this.converterFactoryProvider = converterFactoryProvider;
}
@Override
@@ -75,4 +78,8 @@ public class EnumerableSubScan extends AbstractSubScan {
public String getSchemaPath() {
return schemaPath;
}
+
+ public ColumnConverterFactoryProvider factoryProvider() {
+ return converterFactoryProvider;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
index 0df66b5..2d6e5c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.physical.LeafPrel;
@@ -34,6 +35,7 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
import org.apache.drill.exec.store.enumerable.EnumerableGroupScan;
import java.io.IOException;
@@ -54,6 +56,7 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
private final Map<String, Integer> fieldsMap;
private final TupleMetadata schema;
private final String planPrefix;
+ private final ColumnConverterFactoryProvider factoryProvider;
public EnumerablePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, EnumerablePrelContext context) {
super(cluster, traitSet);
@@ -77,6 +80,7 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
} catch (IOException e) {
throw new RuntimeException(e);
}
+ factoryProvider = context.factoryProvider();
}
@Override
@@ -84,7 +88,8 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
List<SchemaPath> columns = rowType.getFieldNames().stream()
.map(SchemaPath::getSimplePath)
.collect(Collectors.toList());
- EnumerableGroupScan groupScan = new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath);
+ GroupScan groupScan =
+ new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath, factoryProvider);
return creator.addMetadata(this, groupScan);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
index 83f8f72..6250421 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.store.enumerable.plan;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.DefaultColumnConverterFactoryProvider;
import java.util.Map;
@@ -33,4 +35,8 @@ public interface EnumerablePrelContext {
String getPlanPrefix();
String getTablePath(RelNode input);
+
+ default ColumnConverterFactoryProvider factoryProvider() {
+ return DefaultColumnConverterFactoryProvider.INSTANCE;
+ }
}