You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/03/03 05:00:31 UTC

[drill] branch master updated: DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1597779  DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)
1597779 is described below

commit 1597779464c988edd6bffeef08c2542173207aa4
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Mar 3 07:00:23 2022 +0200

    DRILL-8151: Add support for more ElasticSearch and Cassandra data types (#2477)
---
 contrib/storage-cassandra/pom.xml                  |   2 +-
 .../cassandra/CassandraColumnConverterFactory.java | 144 +++++++++++++++++++++
 .../CassandraColumnConverterFactoryProvider.java   |  25 ++--
 .../plan/CassandraEnumerablePrelContext.java       |   7 +
 .../exec/store/cassandra/BaseCassandraTest.java    |   8 +-
 .../exec/store/cassandra/CassandraQueryTest.java   |  91 +++++++++----
 ...tCassandraSuit.java => TestCassandraSuite.java} |   6 +-
 .../src/test/resources/queries.cql                 |  18 ++-
 contrib/storage-elasticsearch/pom.xml              |  52 +++-----
 .../ElasticsearchColumnConverterFactory.java       |  65 ++++++++++
 ...lasticsearchColumnConverterFactoryProvider.java |  25 ++--
 .../plan/ElasticSearchEnumerablePrelContext.java   |   7 +
 .../elasticsearch/ElasticComplexTypesTest.java     |   9 +-
 .../store/elasticsearch/ElasticInfoSchemaTest.java |   8 +-
 .../store/elasticsearch/ElasticSearchPlanTest.java |   8 +-
 .../elasticsearch/ElasticSearchQueryTest.java      |  77 +++++++----
 .../elasticsearch/TestElasticsearchSuite.java}     |  46 ++++---
 .../apache/drill/exec/record/ColumnConverter.java  |   2 +-
 ...xt.java => ColumnConverterFactoryProvider.java} |  22 +---
 ... => DefaultColumnConverterFactoryProvider.java} |  24 ++--
 .../store/enumerable/EnumerableBatchCreator.java   |   2 +-
 .../exec/store/enumerable/EnumerableGroupScan.java |  13 +-
 .../store/enumerable/EnumerableRecordReader.java   |   8 +-
 .../exec/store/enumerable/EnumerableSubScan.java   |   9 +-
 .../exec/store/enumerable/plan/EnumerablePrel.java |   7 +-
 .../enumerable/plan/EnumerablePrelContext.java     |   6 +
 26 files changed, 496 insertions(+), 195 deletions(-)

diff --git a/contrib/storage-cassandra/pom.xml b/contrib/storage-cassandra/pom.xml
index 5652b94..5fa0273 100644
--- a/contrib/storage-cassandra/pom.xml
+++ b/contrib/storage-cassandra/pom.xml
@@ -107,7 +107,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <includes>
-            <include>**/TestCassandraSuit.class</include>
+            <include>**/TestCassandraSuite.class</include>
           </includes>
           <excludes>
             <exclude>**/CassandraComplexTypesTest.java</exclude>
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java
new file mode 100644
index 0000000..861b83f
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactory.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.datastax.driver.core.Duration;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.Inet4Address;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class CassandraColumnConverterFactory extends ColumnConverterFactory {
+
+  private static final PeriodFormatter FORMATTER = new PeriodFormatterBuilder()
+    .appendYears()
+    .appendSuffix("Y")
+    .appendMonths()
+    .appendSuffix("M")
+    .appendWeeks()
+    .appendSuffix("W")
+    .appendDays()
+    .appendSuffix("D")
+    .appendHours()
+    .appendSuffix("H")
+    .appendMinutes()
+    .appendSuffix("M")
+    .appendSecondsWithOptionalMillis()
+    .appendSuffix("S")
+    .toFormatter();
+
+  public CassandraColumnConverterFactory(TupleMetadata providedSchema) {
+    super(providedSchema);
+  }
+
+  @Override
+  public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) {
+    switch (readerSchema.type()) {
+      case INTERVAL:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          Duration duration = (Duration) value;
+          writer.setPeriod(Period.parse(duration.toString(), FORMATTER));
+        });
+      case BIGINT:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          long longValue;
+          if (value instanceof BigInteger) {
+            longValue = ((BigInteger) value).longValue();
+          } else {
+            longValue = (Long) value;
+          }
+          writer.setLong(longValue);
+        });
+      case VARCHAR:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setString(value.toString()));
+      case VARDECIMAL:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setDecimal((BigDecimal) value));
+      case VARBINARY:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          byte[] bytes;
+          if (value instanceof Inet4Address) {
+            bytes = ((Inet4Address) value).getAddress();
+          } else if (value instanceof UUID) {
+            UUID uuid = (UUID) value;
+            bytes = ByteBuffer.wrap(new byte[16])
+              .order(ByteOrder.BIG_ENDIAN)
+              .putLong(uuid.getMostSignificantBits())
+              .putLong(uuid.getLeastSignificantBits())
+              .array();
+          } else {
+            bytes = (byte[]) value;
+          }
+          writer.setBytes(bytes, bytes.length);
+        });
+      case BIT:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setBoolean((Boolean) value));
+      default:
+        return super.buildScalar(readerSchema, writer);
+    }
+  }
+
+  @Override
+  protected ColumnConverter getMapConverter(TupleMetadata providedSchema,
+    TupleMetadata readerSchema, TupleWriter tupleWriter) {
+    Map<String, ColumnConverter> converters = StreamSupport.stream(readerSchema.spliterator(), false)
+      .collect(Collectors.toMap(
+        ColumnMetadata::name,
+        columnMetadata ->
+          getConverter(providedSchema, columnMetadata, tupleWriter.column(columnMetadata.name()))));
+
+    return new CassandraMapColumnConverter(this, providedSchema, tupleWriter, converters);
+  }
+
+  private static class CassandraMapColumnConverter extends ColumnConverter.MapColumnConverter {
+
+    public CassandraMapColumnConverter(ColumnConverterFactory factory, TupleMetadata providedSchema, TupleWriter tupleWriter, Map<String, ColumnConverter> converters) {
+      super(factory, providedSchema, tupleWriter, converters);
+    }
+
+    @Override
+    protected TypeProtos.MinorType getScalarMinorType(Class<?> clazz) {
+      if (clazz == Duration.class) {
+        return TypeProtos.MinorType.INTERVAL;
+      } else if (clazz == Inet4Address.class
+        || clazz == UUID.class) {
+        return TypeProtos.MinorType.VARBINARY;
+      } else if (clazz == java.math.BigInteger.class) {
+        return TypeProtos.MinorType.BIGINT;
+      } else if (clazz == org.apache.calcite.avatica.util.ByteString.class) {
+        return TypeProtos.MinorType.VARCHAR;
+      }
+      return super.getScalarMinorType(clazz);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
similarity index 57%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
index 83f8f72..54254eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraColumnConverterFactoryProvider.java
@@ -15,22 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.cassandra;
 
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
 
-import java.util.Map;
+public class CassandraColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+  public static final ColumnConverterFactoryProvider INSTANCE = new CassandraColumnConverterFactoryProvider();
 
-public interface EnumerablePrelContext {
-
-  String generateCode(RelOptCluster cluster, RelNode relNode);
-
-  RelNode transformNode(RelNode input);
-
-  Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
-  String getPlanPrefix();
-
-  String getTablePath(RelNode input);
+  @Override
+  public ColumnConverterFactory getFactory(TupleMetadata schema) {
+    return new CassandraColumnConverterFactory(schema);
+  }
 }
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
index 06cf181..91e5608 100644
--- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.cassandra.CassandraColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
 import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
 
 import java.util.Collections;
@@ -78,4 +80,9 @@ public class CassandraEnumerablePrelContext implements EnumerablePrelContext {
     List<String> qualifiedName = scan.getTable().getQualifiedName();
     return String.join(".", qualifiedName.subList(0, qualifiedName.size() - 1));
   }
+
+  @Override
+  public ColumnConverterFactoryProvider factoryProvider() {
+    return CassandraColumnConverterFactoryProvider.INSTANCE;
+  }
 }
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
index a8eb40f..faae847 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -27,8 +27,8 @@ public class BaseCassandraTest extends ClusterTest {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TestCassandraSuit.initCassandra();
-    initCassandraPlugin(TestCassandraSuit.cassandra);
+    TestCassandraSuite.initCassandra();
+    initCassandraPlugin(TestCassandraSuite.cassandra);
   }
 
   private static void initCassandraPlugin(CassandraContainer<?> cassandra) throws Exception {
@@ -46,8 +46,8 @@ public class BaseCassandraTest extends ClusterTest {
 
   @AfterClass
   public static void tearDownCassandra() {
-    if (TestCassandraSuit.isRunningSuite()) {
-      TestCassandraSuit.tearDownCluster();
+    if (TestCassandraSuite.isRunningSuite()) {
+      TestCassandraSuite.tearDownCluster();
     }
   }
 }
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
index e7aecaa..536a102 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
@@ -18,10 +18,15 @@
 package org.apache.drill.exec.store.cassandra;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.joda.time.Period;
 import org.junit.Test;
 
 import java.math.BigDecimal;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.time.LocalDate;
+import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -32,21 +37,24 @@ public class CassandraQueryTest extends BaseCassandraTest {
   @Test
   public void testSelectAll() throws Exception {
     testBuilder()
-        .sqlQuery("select * from cassandra.test_keyspace.`employee`")
-        .unOrdered()
+        .sqlQuery("select * from cassandra.test_keyspace.`employee` order by employee_id")
+        .ordered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
-        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
-        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management")
-        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management")
-        .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors Degree", "S", "F", "Store Management")
-        .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5, "Graduate Degree", "M", "F", "Store Management")
-        .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial College", "M", "M", "Senior Management")
-        .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5, "Graduate Degree", "S", "M", "Store Management")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+            "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+            "uuid_field", "varchar_field", "varint_field")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors Degree", "S", "F", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5, "Graduate Degree", "M", "F", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial College", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5, "Graduate Degree", "S", "M", "Store Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
         .go();
   }
 
@@ -76,9 +84,18 @@ public class CassandraQueryTest extends BaseCassandraTest {
         .unOrdered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+            "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+            "uuid_field", "varchar_field", "varint_field")
         .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26",
-            "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
+            "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management",
+            "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123,
+            new Period(0, 0, 0, 3, 0, 0, 0, 320688000),
+            InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L,
+          getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+          getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+            "abc", 123L)
         .go();
   }
 
@@ -144,10 +161,13 @@ public class CassandraQueryTest extends BaseCassandraTest {
         .ordered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
-        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management")
-        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management")
-        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+            "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+            "uuid_field", "varchar_field", "varint_field")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null)
         .go();
   }
 
@@ -275,10 +295,13 @@ public class CassandraQueryTest extends BaseCassandraTest {
         .baselineColumns("full_name")
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role", "full_name0")
-        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", 123)
-        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", 123)
-        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", 123)
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+            "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+            "uuid_field", "varchar_field", "varint_field", "full_name0")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", "Senior Management", "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123, new Period(0, 0, 0, 3, 0, 0, 0, 320688000), InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L, getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"), getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f") [...]
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null, 123)
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, null, null, null, null, null, null, 123)
         .go();
   }
 
@@ -327,9 +350,27 @@ public class CassandraQueryTest extends BaseCassandraTest {
         .ordered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "ascii_field", "blob_field", "boolean_field", "date_field", "decimal_field", "double_field",
+            "duration_field", "inet_field", "time_field", "timestamp_field", "timeuuid_field",
+            "uuid_field", "varchar_field", "varint_field")
         .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, LocalDate.parse("1961-08-26"),
-            "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management")
+            "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management",
+            "abc", "0000000000000003", true, 15008L, BigDecimal.valueOf(123), 321.123,
+            new Period(0, 0, 0, 3, 0, 0, 0, 320688000),
+            InetAddress.getByName("8.8.8.8").getAddress(), 14700000000000L, 1296705900000L,
+          getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+          getUuidBytes("50554d6e-29bb-11e5-b345-feff819cdc9f"),
+            "abc", 123L)
         .go();
   }
+
+  private static byte[] getUuidBytes(String name) {
+    UUID uuid = UUID.fromString(name);
+    return ByteBuffer.wrap(new byte[16])
+      .order(ByteOrder.BIG_ENDIAN)
+      .putLong(uuid.getMostSignificantBits())
+      .putLong(uuid.getLeastSignificantBits())
+      .array();
+  }
 }
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
similarity index 94%
copy from contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
copy to contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
index 9009ba7..98ead70 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
@@ -32,7 +32,7 @@ import org.testcontainers.containers.CassandraContainer;
 @Category(SlowTest.class)
 @RunWith(Suite.class)
 @Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class})
-public class TestCassandraSuit extends BaseTest {
+public class TestCassandraSuite extends BaseTest {
 
   protected static CassandraContainer<?> cassandra;
 
@@ -42,7 +42,7 @@ public class TestCassandraSuit extends BaseTest {
 
   @BeforeClass
   public static void initCassandra() {
-    synchronized (TestCassandraSuit.class) {
+    synchronized (TestCassandraSuite.class) {
       if (initCount.get() == 0) {
         startCassandra();
       }
@@ -57,7 +57,7 @@ public class TestCassandraSuit extends BaseTest {
 
   @AfterClass
   public static void tearDownCluster() {
-    synchronized (TestCassandraSuit.class) {
+    synchronized (TestCassandraSuite.class) {
       if (initCount.decrementAndGet() == 0 && cassandra != null) {
         cassandra.stop();
       }
diff --git a/contrib/storage-cassandra/src/test/resources/queries.cql b/contrib/storage-cassandra/src/test/resources/queries.cql
index 527d88c..6382c53 100644
--- a/contrib/storage-cassandra/src/test/resources/queries.cql
+++ b/contrib/storage-cassandra/src/test/resources/queries.cql
@@ -35,13 +35,27 @@ CREATE TABLE test_keyspace.employee (
     marital_status text,
     gender text,
     management_role text,
+    ascii_field ascii,
+    blob_field blob,
+    boolean_field boolean,
+    date_field date,
+    decimal_field decimal,
+    double_field double,
+    duration_field duration,
+    inet_field inet,
+    time_field time,
+    timestamp_field timestamp,
+    timeuuid_field timeuuid,
+    uuid_field uuid,
+    varchar_field varchar,
+    varint_field varint,
     PRIMARY KEY (full_name, employee_id)
 ) WITH CLUSTERING ORDER BY (employee_id ASC);
 
 USE test_keyspace;
 
-INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
- VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1, '1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role, ascii_field, blob_field, boolean_field, date_field, decimal_field, double_field, duration_field, inet_field, time_field,  timestamp_field, timeuuid_field, uuid_field, varchar_field, varint_field)
+ VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1, '1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F', 'Senior Management', 'abc', bigintAsBlob(3), true, '2011-02-03', 123.456, 321.123, 3d89h4m48s, '8.8.8.8', '04:05:00',  '2011-02-03 04:05:00', 50554d6e-29bb-11e5-b345-feff819cdc9f, 50554d6e-29bb-11e5-b345-feff819cdc9f, 'abc', 123);
 INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
  VALUES (2, 'Derrick Whelply', 'Derrick', 'Whelply', 2, 'VP Country Manager',0,1, '1915-07-03', '1994-12-01 00:00:00.0',40000.0 ,1, 'Graduate Degree', 'M', 'M', 'Senior Management');
 INSERT INTO employee(employee_id, full_name, first_name, last_name, position_id, position_title, store_id, department_id, birth_date, hire_date, salary, supervisor_id, education_level, marital_status, gender, management_role)
diff --git a/contrib/storage-elasticsearch/pom.xml b/contrib/storage-elasticsearch/pom.xml
index 40757b7..d645646 100644
--- a/contrib/storage-elasticsearch/pom.xml
+++ b/contrib/storage-elasticsearch/pom.xml
@@ -20,9 +20,6 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <properties>
-    <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
-  </properties>
   <parent>
     <artifactId>drill-contrib-parent</artifactId>
     <groupId>org.apache.drill.contrib</groupId>
@@ -80,6 +77,18 @@
       <version>0.4</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.datastax.cassandra</groupId>
+          <artifactId>cassandra-driver-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
@@ -88,36 +97,17 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <forkCount combine.self="override">1</forkCount>
+          <includes>
+            <include>**/TestElasticsearchSuite.class</include>
+          </includes>
+          <excludes>
+            <exclude>**/ElasticComplexTypesTest.java</exclude>
+            <exclude>**/ElasticInfoSchemaTest.java</exclude>
+            <exclude>**/ElasticSearchPlanTest.java</exclude>
+            <exclude>**/ElasticSearchQueryTest.java</exclude>
+          </excludes>
         </configuration>
       </plugin>
-      <plugin>
-        <groupId>com.github.alexcojocaru</groupId>
-        <artifactId>elasticsearch-maven-plugin</artifactId>
-        <version>6.19</version>
-        <configuration>
-          <version>${test.elasticsearch.version}</version>
-          <clusterName>test</clusterName>
-          <transportPort>9300</transportPort>
-          <httpPort>9200</httpPort>
-          <skip>${skipTests}</skip>
-        </configuration>
-        <executions>
-          <execution>
-            <id>start-elasticsearch</id>
-            <phase>process-test-classes</phase>
-            <goals>
-              <goal>runforked</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>stop-elasticsearch</id>
-            <phase>post-integration-test</phase>
-            <goals>
-              <goal>stop</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java
new file mode 100644
index 0000000..9e2ee71
--- /dev/null
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class ElasticsearchColumnConverterFactory extends ColumnConverterFactory {
+
+  public ElasticsearchColumnConverterFactory(TupleMetadata providedSchema) {
+    super(providedSchema);
+  }
+
+  @Override
+  public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) {
+    switch (readerSchema.type()) {
+      case BIT:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setBoolean((Boolean) value));
+      default:
+        return super.buildScalar(readerSchema, writer);
+    }
+  }
+
+  @Override
+  protected ColumnConverter getMapConverter(TupleMetadata providedSchema,
+    TupleMetadata readerSchema, TupleWriter tupleWriter) {
+    Map<String, ColumnConverter> converters = StreamSupport.stream(readerSchema.spliterator(), false)
+      .collect(Collectors.toMap(
+        ColumnMetadata::name,
+        columnMetadata ->
+          getConverter(providedSchema, columnMetadata, tupleWriter.column(columnMetadata.name()))));
+
+    return new ElasticMapColumnConverter(this, providedSchema, tupleWriter, converters);
+  }
+
+  private static class ElasticMapColumnConverter extends ColumnConverter.MapColumnConverter {
+
+    public ElasticMapColumnConverter(ColumnConverterFactory factory, TupleMetadata providedSchema, TupleWriter tupleWriter, Map<String, ColumnConverter> converters) {
+      super(factory, providedSchema, tupleWriter, converters);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
similarity index 57%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
index 83f8f72..40cce4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchColumnConverterFactoryProvider.java
@@ -15,22 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.elasticsearch;
 
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
 
-import java.util.Map;
+public class ElasticsearchColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+  public static final ColumnConverterFactoryProvider INSTANCE = new ElasticsearchColumnConverterFactoryProvider();
 
-public interface EnumerablePrelContext {
-
-  String generateCode(RelOptCluster cluster, RelNode relNode);
-
-  RelNode transformNode(RelNode input);
-
-  Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
-  String getPlanPrefix();
-
-  String getTablePath(RelNode input);
+  @Override
+  public ColumnConverterFactory getFactory(TupleMetadata schema) {
+    return new ElasticsearchColumnConverterFactory(schema);
+  }
 }
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
index 29db1ca..e1a92bb 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticSearchEnumerablePrelContext.java
@@ -27,6 +27,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.elasticsearch.ElasticsearchColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
 import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
 
 import java.util.Collections;
@@ -76,4 +78,9 @@ public class ElasticSearchEnumerablePrelContext implements EnumerablePrelContext
   public String getTablePath(RelNode input) {
     return null;
   }
+
+  @Override
+  public ColumnConverterFactoryProvider factoryProvider() {
+    return ElasticsearchColumnConverterFactoryProvider.INSTANCE;
+  }
 }
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
index 43d4681..9f4f52d 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
@@ -46,18 +46,17 @@ import static org.apache.drill.test.TestBuilder.mapOf;
 
 public class ElasticComplexTypesTest extends ClusterTest {
 
-  private static final String HOST = "http://localhost:9200";
-
   private static final List<String> indexNames = new ArrayList<>();
 
   public static RestHighLevelClient restHighLevelClient;
 
   @BeforeClass
   public static void init() throws Exception {
+    TestElasticsearchSuite.initElasticsearch();
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
@@ -69,10 +68,12 @@ public class ElasticComplexTypesTest extends ClusterTest {
     for (String indexName : indexNames) {
       restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
     }
+    TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+    restHighLevelClient = new RestHighLevelClient(
+      RestClient.builder(HttpHost.create(TestElasticsearchSuite.elasticsearch.getHttpHostAddress())));
 
     String indexName = "arr";
     indexNames.add(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
index cad11bb..aa4dae4 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
@@ -41,18 +41,17 @@ import java.util.List;
 
 public class ElasticInfoSchemaTest extends ClusterTest {
 
-  private static final String HOST = "http://localhost:9200";
-
   private static final List<String> indexNames = new ArrayList<>();
 
   public static RestHighLevelClient restHighLevelClient;
 
   @BeforeClass
   public static void init() throws Exception {
+    TestElasticsearchSuite.initElasticsearch();
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
@@ -64,10 +63,11 @@ public class ElasticInfoSchemaTest extends ClusterTest {
     for (String indexName : indexNames) {
       restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
     }
+    TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
 
     String indexName = "t1";
     indexNames.add(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
index 74773af..db81edf 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
@@ -39,18 +39,17 @@ import java.util.Collections;
 
 public class ElasticSearchPlanTest extends ClusterTest {
 
-  private static final String HOST = "http://localhost:9200";
-
   public static RestHighLevelClient restHighLevelClient;
 
   private static String indexName;
 
   @BeforeClass
   public static void init() throws Exception {
+    TestElasticsearchSuite.initElasticsearch();
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
@@ -60,10 +59,11 @@ public class ElasticSearchPlanTest extends ClusterTest {
   @AfterClass
   public static void cleanUp() throws IOException {
     restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+    TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
 
     indexName = "nation";
     CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
index 79f9344..374c449 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.util.Base64;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -46,18 +47,17 @@ import static org.junit.Assert.fail;
 
 public class ElasticSearchQueryTest extends ClusterTest {
 
-  private static final String HOST = "http://localhost:9200";
-
   public static RestHighLevelClient restHighLevelClient;
 
   private static String indexName;
 
   @BeforeClass
   public static void init() throws Exception {
+    TestElasticsearchSuite.initElasticsearch();
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        Collections.singletonList(TestElasticsearchSuite.getAddress()), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
@@ -67,10 +67,11 @@ public class ElasticSearchQueryTest extends ClusterTest {
   @AfterClass
   public static void cleanUp() throws IOException {
     restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+    TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(HOST)));
+    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
 
     indexName = "employee";
     CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
@@ -95,6 +96,14 @@ public class ElasticSearchQueryTest extends ClusterTest {
     builder.field("marital_status", "S");
     builder.field("gender", "F");
     builder.field("management_role", "Senior Management");
+    builder.field("binary_field", "Senior Management".getBytes());
+    builder.field("boolean_field", true);
+    builder.timeField("date_field", "2015/01/01 12:10:30");
+    builder.field("byte_field", (byte) 123);
+    builder.field("long_field", 123L);
+    builder.field("float_field", 123F);
+    builder.field("short_field", (short) 123);
+    builder.field("decimal_field", new BigDecimal("123.45"));
     builder.endObject();
     IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
     restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
@@ -307,17 +316,19 @@ public class ElasticSearchQueryTest extends ClusterTest {
         .unOrdered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
-        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
-        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management")
-        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management")
-        .baselineValues(5, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(6, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(7, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0, 1, "Bachelors Degree", "M", "F", "Senior Management")
-        .baselineValues(8, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0, 5, "Bachelors Degree", "S", "F", "Store Management")
-        .baselineValues(9, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0, 5, "Graduate Degree", "M", "F", "Store Management")
-        .baselineValues(10, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0, 1, "Partial College", "M", "M", "Senior Management")
-        .baselineValues(11, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0, 5, "Graduate Degree", "S", "M", "Store Management")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+          "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+          "short_field", "decimal_field")
+        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
+        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(5, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(6, "Roberta Damstra", "Roberta", "Damstra", 3, "VP Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(7, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0, 1, "Bachelors Degree", "M", "F", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(8, "Kim Brunner", "Kim", "Brunner", 11, "Store Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0, 5, "Bachelors Degree", "S", "F", "Store Management", null, null, null, null, null, null, null, null)
+        .baselineValues(9, "Brenda Blumberg", "Brenda", "Blumberg", 11, "Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0, 5, "Graduate Degree", "M", "F", "Store Management", null, null, null, null, null, null, null, null)
+        .baselineValues(10, "Darren Stanz", "Darren", "Stanz", 5, "VP Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0, 1, "Partial College", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(11, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, "Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0, 5, "Graduate Degree", "S", "M", "Store Management", null, null, null, null, null, null, null, null)
         .go();
   }
 
@@ -347,9 +358,13 @@ public class ElasticSearchQueryTest extends ClusterTest {
         .unOrdered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+            "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+            "short_field", "decimal_field")
         .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26",
-            "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
+            "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management",
+          Base64.getEncoder().encodeToString("Senior Management".getBytes()), true,
+          "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
         .go();
   }
 
@@ -415,10 +430,12 @@ public class ElasticSearchQueryTest extends ClusterTest {
         .ordered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
-        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management")
-        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management")
-        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+          "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+          "short_field", "decimal_field")
+        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
+        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null)
+        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null)
         .go();
   }
 
@@ -538,10 +555,12 @@ public class ElasticSearchQueryTest extends ClusterTest {
         .baselineColumns("full_name")
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role", "full_name0")
-        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", 123)
-        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", 123)
-        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", 123)
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+          "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+          "short_field", "decimal_field", "full_name0")
+        .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0, 0, "Graduate Degree", "S", "F", "Senior Management", Base64.getEncoder().encodeToString("Senior Management".getBytes()), true, "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45, 123)
+        .baselineValues(2, "Derrick Whelply", "Derrick", "Whelply", 2, "VP Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "M", "M", "Senior Management", null, null, null, null, null, null, null, null, 123)
+        .baselineValues(4, "Michael Spence", "Michael", "Spence", 2, "VP Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0, 1, "Graduate Degree", "S", "M", "Senior Management", null, null, null, null, null, null, null, null, 123)
         .go();
   }
 
@@ -590,9 +609,13 @@ public class ElasticSearchQueryTest extends ClusterTest {
         .ordered()
         .baselineColumns("employee_id", "full_name", "first_name", "last_name", "position_id",
             "position_title", "store_id", "department_id", "birth_date", "hire_date", "salary",
-            "supervisor_id", "education_level", "marital_status", "gender", "management_role")
+            "supervisor_id", "education_level", "marital_status", "gender", "management_role",
+          "binary_field", "boolean_field", "date_field", "byte_field", "long_field", "float_field",
+          "short_field", "decimal_field")
         .baselineValues(1, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 0, 1, LocalDate.parse("1961-08-26"),
-            "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management")
+            "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate Degree", "S", "F", "Senior Management",
+          Base64.getEncoder().encodeToString("Senior Management".getBytes()), true,
+          "2015/01/01 12:10:30", 123, 123, 123., 123, 123.45)
         .go();
   }
 
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
similarity index 57%
rename from contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
rename to contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
index 9009ba7..dec9b76 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuit.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
@@ -15,10 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.cassandra;
-
-import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
+package org.apache.drill.exec.store.elasticsearch;
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTest;
@@ -27,24 +24,28 @@ import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
-import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Category(SlowTest.class)
 @RunWith(Suite.class)
-@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class})
-public class TestCassandraSuit extends BaseTest {
+@Suite.SuiteClasses({ElasticComplexTypesTest.class, ElasticInfoSchemaTest.class, ElasticSearchPlanTest.class, ElasticSearchQueryTest.class})
+public class TestElasticsearchSuite extends BaseTest {
 
-  protected static CassandraContainer<?> cassandra;
+  protected static ElasticsearchContainer elasticsearch;
 
   private static final AtomicInteger initCount = new AtomicInteger(0);
 
   private static volatile boolean runningSuite = false;
 
   @BeforeClass
-  public static void initCassandra() {
-    synchronized (TestCassandraSuit.class) {
+  public static void initElasticsearch() {
+    synchronized (TestElasticsearchSuite.class) {
       if (initCount.get() == 0) {
-        startCassandra();
+        startElasticsearch();
       }
       initCount.incrementAndGet();
       runningSuite = true;
@@ -57,19 +58,22 @@ public class TestCassandraSuit extends BaseTest {
 
   @AfterClass
   public static void tearDownCluster() {
-    synchronized (TestCassandraSuit.class) {
-      if (initCount.decrementAndGet() == 0 && cassandra != null) {
-        cassandra.stop();
+    synchronized (TestElasticsearchSuite.class) {
+      if (initCount.decrementAndGet() == 0 && elasticsearch != null) {
+        elasticsearch.stop();
       }
     }
   }
 
-  private static void startCassandra() {
-    cassandra = new CassandraContainer<>("cassandra")
-      .withInitScript("queries.cql")
-      .withStartupTimeout(Duration.ofMinutes(2))
-      .withEnv("CASSANDRA_SNITCH", "GossipingPropertyFileSnitch") // Tune Cassandra options for faster startup
-      .withEnv("JVM_OPTS", "-Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.initial_token=0");
-    cassandra.start();
+  private static void startElasticsearch() {
+    DockerImageName imageName = DockerImageName.parse("elasticsearch:7.14.2")
+      .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+    TestElasticsearchSuite.elasticsearch = new ElasticsearchContainer(imageName)
+      .withStartupTimeout(Duration.ofMinutes(2));
+    TestElasticsearchSuite.elasticsearch.start();
+  }
+
+  public static String getAddress() {
+    return elasticsearch.getHttpHostAddress();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
index d81db76..e8cad3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
@@ -219,7 +219,7 @@ public interface ColumnConverter {
       }
     }
 
-    private MinorType getScalarMinorType(Class<?> clazz) {
+    protected MinorType getScalarMinorType(Class<?> clazz) {
       if (clazz == byte.class || clazz == Byte.class) {
         return MinorType.TINYINT;
       } else if (clazz == short.class || clazz == Short.class) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
similarity index 64%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
index 83f8f72..28aec71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
@@ -15,22 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.enumerable;
 
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 
-import java.util.Map;
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+public interface ColumnConverterFactoryProvider {
 
-public interface EnumerablePrelContext {
-
-  String generateCode(RelOptCluster cluster, RelNode relNode);
-
-  RelNode transformNode(RelNode input);
-
-  Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
-  String getPlanPrefix();
-
-  String getTablePath(RelNode input);
+  ColumnConverterFactory getFactory(TupleMetadata schema);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
similarity index 61%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
index 83f8f72..75573ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DefaultColumnConverterFactoryProvider.java
@@ -15,22 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.enumerable.plan;
+package org.apache.drill.exec.store.enumerable;
 
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 
-import java.util.Map;
+public class DefaultColumnConverterFactoryProvider implements ColumnConverterFactoryProvider {
+  public static ColumnConverterFactoryProvider INSTANCE = new DefaultColumnConverterFactoryProvider();
 
-public interface EnumerablePrelContext {
-
-  String generateCode(RelOptCluster cluster, RelNode relNode);
-
-  RelNode transformNode(RelNode input);
-
-  Map<String, Integer> getFieldsMap(RelNode transformedNode);
-
-  String getPlanPrefix();
-
-  String getTablePath(RelNode input);
+  @Override
+  public ColumnConverterFactory getFactory(TupleMetadata schema) {
+    return new ColumnConverterFactory(schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
index 9c4ca54..2dec45a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
@@ -61,7 +61,7 @@ public class EnumerableBatchCreator implements BatchCreator<EnumerableSubScan> {
     builder.providedSchema(subScan.getSchema());
 
     ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
-        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath());
+        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
index 815a5c2..e74a97f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableGroupScan.java
@@ -39,6 +39,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
   private final List<SchemaPath> columns;
   private final double rows;
   private final TupleMetadata schema;
+  private final ColumnConverterFactoryProvider converterFactoryProvider;
 
   @JsonCreator
   public EnumerableGroupScan(
@@ -47,7 +48,8 @@ public class EnumerableGroupScan extends AbstractGroupScan {
       @JsonProperty("fieldsMap") Map<String, Integer> fieldsMap,
       @JsonProperty("rows") double rows,
       @JsonProperty("schema") TupleMetadata schema,
-      @JsonProperty("schemaPath") String schemaPath) {
+      @JsonProperty("schemaPath") String schemaPath,
+      @JsonProperty("converterFactoryProvider") ColumnConverterFactoryProvider converterFactoryProvider) {
     super("");
     this.code = code;
     this.columns = columns;
@@ -55,6 +57,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
     this.rows = rows;
     this.schema = schema;
     this.schemaPath = schemaPath;
+    this.converterFactoryProvider = converterFactoryProvider;
   }
 
   @Override
@@ -63,7 +66,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new EnumerableSubScan(code, columns, fieldsMap, schema, schemaPath);
+    return new EnumerableSubScan(code, columns, fieldsMap, schema, schemaPath, converterFactoryProvider);
   }
 
   @Override
@@ -105,6 +108,10 @@ public class EnumerableGroupScan extends AbstractGroupScan {
     return schemaPath;
   }
 
+  public ColumnConverterFactoryProvider getConverterFactoryProvider() {
+    return converterFactoryProvider;
+  }
+
   @Override
   public String getDigest() {
     return toString();
@@ -113,7 +120,7 @@ public class EnumerableGroupScan extends AbstractGroupScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath);
+    return new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath, converterFactoryProvider);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
index 85ea7c4..ee07923 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableRecordReader.java
@@ -66,17 +66,21 @@ public class EnumerableRecordReader implements ManagedReader<SchemaNegotiator> {
 
   private final String schemaPath;
 
+  private final ColumnConverterFactoryProvider factoryProvider;
+
   private ColumnConverter converter;
 
   private Iterator<Map<String, Object>> records;
 
   private ResultSetLoader loader;
 
-  public EnumerableRecordReader(List<SchemaPath> columns, Map<String, Integer> fieldsMap, String code, String schemaPath) {
+  public EnumerableRecordReader(List<SchemaPath> columns, Map<String, Integer> fieldsMap,
+    String code, String schemaPath, ColumnConverterFactoryProvider factoryProvider) {
     this.columns = columns;
     this.fieldsMap = fieldsMap;
     this.code = code;
     this.schemaPath = schemaPath;
+    this.factoryProvider = factoryProvider;
   }
 
   @SuppressWarnings("unchecked")
@@ -140,7 +144,7 @@ public class EnumerableRecordReader implements ManagedReader<SchemaNegotiator> {
     TupleMetadata providedSchema = negotiator.providedSchema();
     loader = negotiator.build();
     setup(negotiator.context());
-    ColumnConverterFactory factory = new ColumnConverterFactory(providedSchema);
+    ColumnConverterFactory factory = factoryProvider.getFactory(providedSchema);
     converter = factory.getRootConverter(providedSchema, new TupleSchema(), loader.writer());
     return true;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
index 85d282b..4476be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
@@ -35,6 +35,7 @@ public class EnumerableSubScan extends AbstractSubScan {
   private final List<SchemaPath> columns;
   private final Map<String, Integer> fieldsMap;
   private final TupleMetadata schema;
+  private final ColumnConverterFactoryProvider converterFactoryProvider;
 
   @JsonCreator
   public EnumerableSubScan(
@@ -42,13 +43,15 @@ public class EnumerableSubScan extends AbstractSubScan {
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("fieldsMap") Map<String, Integer> fieldsMap,
       @JsonProperty("schema") TupleMetadata schema,
-      @JsonProperty("schemaPath") String schemaPath) {
+      @JsonProperty("schemaPath") String schemaPath,
+      @JsonProperty("converterFactoryProvider") ColumnConverterFactoryProvider converterFactoryProvider) {
     super("");
     this.code = code;
     this.columns = columns;
     this.fieldsMap = fieldsMap;
     this.schema = schema;
     this.schemaPath = schemaPath;
+    this.converterFactoryProvider = converterFactoryProvider;
   }
 
   @Override
@@ -75,4 +78,8 @@ public class EnumerableSubScan extends AbstractSubScan {
   public String getSchemaPath() {
     return schemaPath;
   }
+
+  public ColumnConverterFactoryProvider factoryProvider() {
+    return converterFactoryProvider;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
index 0df66b5..2d6e5c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.physical.LeafPrel;
@@ -34,6 +35,7 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
 import org.apache.drill.exec.store.enumerable.EnumerableGroupScan;
 
 import java.io.IOException;
@@ -54,6 +56,7 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
   private final Map<String, Integer> fieldsMap;
   private final TupleMetadata schema;
   private final String planPrefix;
+  private final ColumnConverterFactoryProvider factoryProvider;
 
   public EnumerablePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, EnumerablePrelContext context) {
     super(cluster, traitSet);
@@ -77,6 +80,7 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    factoryProvider = context.factoryProvider();
   }
 
   @Override
@@ -84,7 +88,8 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
     List<SchemaPath> columns = rowType.getFieldNames().stream()
         .map(SchemaPath::getSimplePath)
         .collect(Collectors.toList());
-    EnumerableGroupScan groupScan = new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath);
+    GroupScan groupScan =
+      new EnumerableGroupScan(code, columns, fieldsMap, rows, schema, schemaPath, factoryProvider);
     return creator.addMetadata(this, groupScan);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
index 83f8f72..6250421 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.store.enumerable.plan;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.enumerable.ColumnConverterFactoryProvider;
+import org.apache.drill.exec.store.enumerable.DefaultColumnConverterFactoryProvider;
 
 import java.util.Map;
 
@@ -33,4 +35,8 @@ public interface EnumerablePrelContext {
   String getPlanPrefix();
 
   String getTablePath(RelNode input);
+
+  default ColumnConverterFactoryProvider factoryProvider() {
+    return DefaultColumnConverterFactoryProvider.INSTANCE;
+  }
 }