You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/04/28 11:49:17 UTC

[drill] branch master updated: DRILL-7903: Update mongo driver from 3.12 to 4.2 (#2201)

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new ed0c304  DRILL-7903: Update mongo driver from 3.12 to 4.2 (#2201)
ed0c304 is described below

commit ed0c304389f880769066fea3ab8d55f7d1c8b5de
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Apr 28 14:49:04 2021 +0300

    DRILL-7903: Update mongo driver from 3.12 to 4.2 (#2201)
---
 contrib/storage-mongo/pom.xml                      |  10 +-
 .../drill/exec/store/bson/BsonRecordReader.java    |   0
 .../drill/exec/store/mongo/MongoGroupScan.java     |  92 ++----
 .../drill/exec/store/mongo/MongoRecordReader.java  |   6 +-
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  54 ++--
 .../exec/store/mongo/MongoStoragePluginConfig.java |  18 +-
 .../mongo/config/MongoPersistentStoreProvider.java |  20 +-
 .../store/mongo/schema/MongoSchemaFactory.java     |  47 ++-
 .../exec/store/bson/TestBsonRecordReader.java      |  21 +-
 .../drill/exec/store/mongo/MongoTestSuite.java     | 322 ++++++++++-----------
 ...TestMongoStoragePluginUsesCredentialsStore.java |  13 +-
 .../drill/exec/store/mongo/TestTableGenerator.java |  57 +---
 exec/java-exec/pom.xml                             |   5 -
 13 files changed, 275 insertions(+), 390 deletions(-)

diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 9746615..fdd6204 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -44,8 +44,8 @@
 
   <dependency>
     <groupId>org.mongodb</groupId>
-    <artifactId>mongo-java-driver</artifactId>
-    <version>3.12.8</version>
+    <artifactId>mongodb-driver-sync</artifactId>
+    <version>4.2.3</version>
   </dependency>
 
     <!-- Test dependency -->
@@ -64,9 +64,9 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>de.flapdoodle.embed</groupId>
-      <artifactId>de.flapdoodle.embed.mongo</artifactId>
-      <version>2.2.0</version>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>mongodb</artifactId>
+      <version>1.15.2</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
rename to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 97fd312..ef3041b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -19,13 +19,14 @@ package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
@@ -36,13 +37,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.mongodb.client.MongoClient;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -68,7 +68,6 @@ import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import com.mongodb.MongoClient;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.FindIterable;
@@ -84,16 +83,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
-    @Override
-    public int compare(List<MongoSubScanSpec> list1,
-        List<MongoSubScanSpec> list2) {
-      return list1.size() - list2.size();
-    }
-  };
+  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections
-      .reverseOrder(LIST_SIZE_COMPARATOR);
+  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
 
   private MongoStoragePlugin storagePlugin;
 
@@ -124,8 +116,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JacksonInject StoragePluginRegistry pluginRegistry,
-      @JsonProperty("maxRecords") int maxRecords) throws IOException,
-      ExecutionSetupException {
+      @JsonProperty("maxRecords") int maxRecords) throws IOException {
     this(userName,
         pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
         scanSpec, columns, maxRecords);
@@ -138,7 +129,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
     this.columns = columns;
-    this.storagePluginConfig.getConnection();
     this.maxRecords = maxRecords;
     init();
   }
@@ -175,11 +165,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
   private boolean isShardedCluster(MongoClient client) {
     MongoDatabase db = client.getDatabase(scanSpec.getDbName());
     String msg = db.runCommand(new Document("isMaster", 1)).getString("msg");
-    return msg == null ? false : msg.equals("isdbgrid");
+    return msg != null && msg.equals("isdbgrid");
   }
 
   @SuppressWarnings({ "rawtypes" })
-  private void init() throws IOException {
+  private void init() {
 
     List<String> h = storagePluginConfig.getHosts();
     List<ServerAddress> addresses = Lists.newArrayList();
@@ -216,33 +206,26 @@ public class MongoGroupScan extends AbstractGroupScan implements
       while (iterator.hasNext()) {
         Document chunkObj = iterator.next();
         String shardName = (String) chunkObj.get(SHARD);
-        String chunkId = (String) chunkObj.get(ID);
+        // creates hexadecimal string representation of ObjectId
+        String chunkId = chunkObj.get(ID).toString();
         filter = new Document(ID, shardName);
         FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
-        MongoCursor<Document> hostIterator = hostCursor.iterator();
-        while (hostIterator.hasNext()) {
-          Document hostObj = hostIterator.next();
+        for (Document hostObj : hostCursor) {
           String hostEntry = (String) hostObj.get(HOST);
           String[] tagAndHost = StringUtils.split(hostEntry, '/');
           String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
               tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
-          List<String> chunkHosts = Arrays.asList(hosts);
-          Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses), chunkHosts);
+          Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses));
           if (addressList == null) {
             addressList = Sets.newHashSet();
-            for (String host : chunkHosts) {
+            for (String host : hosts) {
               addressList.add(new ServerAddress(host));
             }
           }
           chunksMapping.put(chunkId, addressList);
           ServerAddress address = addressList.iterator().next();
-          List<ChunkInfo> chunkList = chunksInverseMapping.get(address
-              .getHost());
-          if (chunkList == null) {
-            chunkList = Lists.newArrayList();
-            chunksInverseMapping.put(address.getHost(), chunkList);
-          }
-          List<String> chunkHostsList = new ArrayList<String>();
+          List<ChunkInfo> chunkList = chunksInverseMapping.computeIfAbsent(address.getHost(), k -> new ArrayList<>());
+          List<String> chunkHostsList = new ArrayList<>();
           for (ServerAddress serverAddr : addressList) {
             chunkHostsList.add(serverAddr.toString());
           }
@@ -277,7 +260,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       // In a sharded environment, if a collection doesn't have any chunks, it is considered as an
       // unsharded collection and it will be stored in the primary shard of that database.
       if (!hasChunks) {
-        handleUnshardedCollection(getPrimaryShardInfo(client));
+        handleUnshardedCollection(getPrimaryShardInfo());
       }
     } else {
       handleUnshardedCollection(storagePluginConfig.getHosts());
@@ -297,21 +280,20 @@ public class MongoGroupScan extends AbstractGroupScan implements
     String host = hosts.get(0);
     ServerAddress address = new ServerAddress(host);
     ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
-    chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
-    chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+    chunkInfo.setMinFilters(Collections.emptyMap());
+    chunkInfo.setMaxFilters(Collections.emptyMap());
     List<ChunkInfo> chunksList = Lists.newArrayList();
     chunksList.add(chunkInfo);
     chunksInverseMapping.put(address.getHost(), chunksList);
   }
 
-  private List<String> getPrimaryShardInfo(MongoClient client) {
+  private List<String> getPrimaryShardInfo() {
     MongoDatabase database = storagePlugin.getClient().getDatabase(CONFIG);
     //Identify the primary shard of the queried database.
     MongoCollection<Document> collection = database.getCollection(DATABASES);
     Bson filter = new Document(ID, this.scanSpec.getDbName());
     Bson projection = new Document(PRIMARY, select);
-    Document document = collection.find(filter).projection(projection).first();
-    Preconditions.checkNotNull(document);
+    Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
     String shardName = document.getString(PRIMARY);
     Preconditions.checkNotNull(shardName);
 
@@ -319,8 +301,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
     filter = new Document(ID, shardName);
     projection = new Document(HOST, select);
-    Document hostInfo = shardsCol.find(filter).projection(projection).first();
-    Preconditions.checkNotNull(hostInfo);
+    Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
     String hostEntry = hostInfo.getString(HOST);
     Preconditions.checkNotNull(hostEntry);
 
@@ -331,10 +312,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   @SuppressWarnings("unchecked")
-  private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) {
+  private Set<ServerAddress> getPreferredHosts(MongoClient client) {
     Set<ServerAddress> addressList = Sets.newHashSet();
     MongoDatabase db = client.getDatabase(scanSpec.getDbName());
-    ReadPreference readPreference = client.getReadPreference();
+    ReadPreference readPreference = db.getReadPreference();
     Document command = db.runCommand(new Document("isMaster", 1));
 
     final String primaryHost = command.getString("primary");
@@ -390,8 +371,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints)
-      throws PhysicalOperatorSetupException {
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
     logger.debug("Incoming endpoints :" + endpoints);
     watch.reset();
     watch.start();
@@ -412,14 +392,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
     Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
 
     for (int i = 0; i < numSlots; ++i) {
-      endpointFragmentMapping.put(i, new ArrayList<MongoSubScanSpec>(
+      endpointFragmentMapping.put(i, new ArrayList<>(
           maxPerEndpointSlot));
       String hostname = endpoints.get(i).getAddress();
-      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
-      if (hostIndexQueue == null) {
-        hostIndexQueue = Lists.newLinkedList();
-        endpointHostIndexListMap.put(hostname, hostIndexQueue);
-      }
+      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.computeIfAbsent(hostname, k -> new LinkedList<>());
       hostIndexQueue.add(i);
     }
 
@@ -442,9 +418,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
     }
 
-    PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+    PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+    PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR_REV);
     for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
@@ -483,7 +459,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    MongoSubScanSpec subScanSpec = new MongoSubScanSpec()
+    return new MongoSubScanSpec()
         .setDbName(scanSpec.getDbName())
         .setCollectionName(scanSpec.getCollectionName())
         .setHosts(chunkInfo.getChunkLocList())
@@ -491,12 +467,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
         .setMaxFilters(chunkInfo.getMaxFilters())
         .setMaxRecords(maxRecords)
         .setFilter(scanSpec.getFilters());
-    return subScanSpec;
   }
 
   @Override
-  public MongoSubScan getSpecificScan(int minorFragmentId)
-      throws ExecutionSetupException {
+  public MongoSubScan getSpecificScan(int minorFragmentId) {
     return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
         endpointFragmentMapping.get(minorFragmentId), columns);
   }
@@ -530,8 +504,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       if (recordCount != 0) {
         //toJson should use client's codec, otherwise toJson could fail on
         // some types not known to DocumentCodec, e.g. DBRef.
-        final DocumentCodec codec =
-            new DocumentCodec(client.getMongoClientOptions().getCodecRegistry(), new BsonTypeClassMap());
+        DocumentCodec codec = new DocumentCodec(db.getCodecRegistry(), new BsonTypeClassMap());
         String json = collection.find().first().toJson(codec);
         approxDiskCost = json.getBytes().length * recordCount;
       }
@@ -542,8 +515,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
-      throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new MongoGroupScan(this);
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 9d8b5bb..b06fe36 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -47,7 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import com.mongodb.MongoClient;
+import com.mongodb.client.MongoClient;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -82,7 +82,7 @@ public class MongoRecordReader extends AbstractRecordReader {
 
     fields = new Document();
     // exclude _id field, if not mentioned by user.
-    fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+    fields.put(DrillMongoConstants.ID, 0);
     setColumns(projectedColumns);
     fragmentContext = context;
     this.plugin = plugin;
@@ -107,7 +107,7 @@ public class MongoRecordReader extends AbstractRecordReader {
       for (SchemaPath column : projectedColumns) {
         String fieldName = column.getRootSegment().getPath();
         transformed.add(column);
-        this.fields.put(fieldName, Integer.valueOf(1));
+        this.fields.put(fieldName, 1);
       }
     } else {
       // Tale all the fields including the _id
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 362ac51..da55907 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.store.mongo;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClients;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URLEncoder;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public class MongoStoragePlugin extends AbstractStoragePlugin {
@@ -60,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   private final MongoStoragePluginConfig mongoConfig;
   private final MongoSchemaFactory schemaFactory;
   private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
-  private final MongoClientURI clientURI;
+  private final ConnectionString clientURI;
 
   public MongoStoragePlugin(
       MongoStoragePluginConfig mongoConfig,
@@ -69,16 +72,17 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     super(context, name);
     this.mongoConfig = mongoConfig;
     String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
-    this.clientURI = new MongoClientURI(connection);
+    this.clientURI = new ConnectionString(connection);
     this.addressClientMap = CacheBuilder.newBuilder()
-      .expireAfterAccess(24, TimeUnit.HOURS)
-      .removalListener(new AddressCloser()).build();
+        .expireAfterAccess(24, TimeUnit.HOURS)
+        .removalListener(new AddressCloser())
+        .build();
     this.schemaFactory = new MongoSchemaFactory(this, name);
   }
 
   private String addCredentialsFromCredentialsProvider(String connection, String name) {
-    MongoClientURI parsed = new MongoClientURI(connection);
-    if (parsed.getCredentials() == null) {
+    ConnectionString parsed = new ConnectionString(connection);
+    if (parsed.getCredential() == null) {
       UsernamePasswordCredentials credentials = getUsernamePasswordCredentials(name);
       try {
         // The default connection has the name "mongo" but multiple connections can be added;
@@ -148,10 +152,6 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     }
   }
 
-  public MongoClient getClient(String host) {
-    return getClient(Collections.singletonList(new ServerAddress(host)));
-  }
-
   public MongoClient getClient() {
     List<String> hosts = clientURI.getHosts();
     List<ServerAddress> addresses = Lists.newArrayList();
@@ -163,22 +163,24 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
 
   public synchronized MongoClient getClient(List<ServerAddress> addresses) {
     // Take the first replica from the replicated servers
-    final ServerAddress serverAddress = addresses.get(0);
-    final MongoCredential credential = clientURI.getCredentials();
+    ServerAddress serverAddress = addresses.get(0);
+    MongoCredential credential = clientURI.getCredential();
     String userName = credential == null ? null : credential.getUserName();
     MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
-    MongoClient client = addressClientMap.getIfPresent(key);
-    if (client == null) {
-      if (credential != null) {
-        client = new MongoClient(addresses, credential, clientURI.getOptions());
-      } else {
-        client = new MongoClient(addresses, clientURI.getOptions());
-      }
-      addressClientMap.put(key, client);
-      logger.debug("Created connection to {}.", key.toString());
-      logger.debug("Number of open connections {}.", addressClientMap.size());
+    try {
+      return addressClientMap.get(key, () -> {
+        logger.info("Created connection to {}.", key);
+        logger.info("Number of open connections {}.", addressClientMap.size());
+        MongoClientSettings.Builder settings = MongoClientSettings.builder()
+            .applyToClusterSettings(builder -> builder.hosts(addresses));
+        if (credential != null) {
+          settings.credential(credential);
+        }
+        return MongoClients.create(settings.build());
+      });
+    } catch (ExecutionException e) {
+      throw new DrillRuntimeException(e);
     }
-    return client;
   }
 
   @Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index 77bca4c..2ceb0d8 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -23,9 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientURI;
-import com.mongodb.MongoCredential;
+import com.mongodb.ConnectionString;
 import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -38,14 +36,14 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   private final String connection;
 
   @JsonIgnore
-  private final MongoClientURI clientURI;
+  private final ConnectionString clientURI;
 
   @JsonCreator
   public MongoStoragePluginConfig(@JsonProperty("connection") String connection,
       @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
     super(getCredentialsProvider(credentialsProvider), credentialsProvider == null);
     this.connection = connection;
-    this.clientURI = new MongoClientURI(connection);
+    this.clientURI = new ConnectionString(connection);
   }
 
   @Override
@@ -66,16 +64,6 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   }
 
   @JsonIgnore
-  public MongoCredential getMongoCredentials() {
-    return clientURI.getCredentials();
-  }
-
-  @JsonIgnore
-  public MongoClientOptions getMongoOptions() {
-    return clientURI.getOptions();
-  }
-
-  @JsonIgnore
   public List<String> getHosts() {
     return clientURI.getHosts();
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
index 1a082ac..80d0465 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.store.mongo.config;
 
 import java.io.IOException;
+import java.util.Objects;
 
-import org.apache.drill.exec.exception.StoreException;
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
 import org.apache.drill.exec.store.mongo.DrillMongoConstants;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
@@ -28,8 +31,6 @@ import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvide
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
 import com.mongodb.WriteConcern;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
@@ -37,9 +38,6 @@ import com.mongodb.client.model.Indexes;
 
 public class MongoPersistentStoreProvider extends BasePersistentStoreProvider {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MongoPersistentStoreProvider.class);
-
   static final String pKey = "pKey";
 
   private MongoClient client;
@@ -48,16 +46,16 @@ public class MongoPersistentStoreProvider extends BasePersistentStoreProvider {
 
   private final String mongoURL;
 
-  public MongoPersistentStoreProvider(PersistentStoreRegistry registry) throws StoreException {
+  public MongoPersistentStoreProvider(PersistentStoreRegistry<?> registry) {
     mongoURL = registry.getConfig().getString(DrillMongoConstants.SYS_STORE_PROVIDER_MONGO_URL);
   }
 
   @Override
   public void start() throws IOException {
-    MongoClientURI clientURI = new MongoClientURI(mongoURL);
-    client = new MongoClient(clientURI);
-    MongoDatabase db = client.getDatabase(clientURI.getDatabase());
-    collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED);
+    ConnectionString clientURI = new ConnectionString(mongoURL);
+    client = MongoClients.create(clientURI);
+    MongoDatabase db = client.getDatabase(Objects.requireNonNull(clientURI.getDatabase()));
+    collection = db.getCollection(Objects.requireNonNull(clientURI.getCollection())).withWriteConcern(WriteConcern.JOURNALED);
     Bson index = Indexes.ascending(pKey);
     collection.createIndex(index);
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index f5623d8..cc071a4 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.mongo.schema;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,11 +25,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -45,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
 import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import com.mongodb.MongoException;
@@ -57,15 +53,15 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
 
   private static final String DATABASES = "databases";
 
-  private LoadingCache<String, List<String>> databases;
-  private LoadingCache<String, List<String>> tableNameLoader;
-  private Map<String, String> schemaNameMap;
+  private final LoadingCache<String, List<String>> databases;
+  private final LoadingCache<String, List<String>> tableNameLoader;
+  private final Map<String, String> schemaNameMap;
   private final MongoStoragePlugin plugin;
 
-  public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+  public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) {
     super(schemaName);
     this.plugin = plugin;
-    this.schemaNameMap = new HashMap<String, String>();
+    this.schemaNameMap = new HashMap<>();
 
     databases = CacheBuilder //
         .newBuilder() //
@@ -81,26 +77,23 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
   private class DatabaseLoader extends CacheLoader<String, List<String>> {
 
     @Override
-    public List<String> load(String key) throws Exception {
+    public List<String> load(String key) {
       if (!DATABASES.equals(key)) {
         throw new UnsupportedOperationException();
       }
       try {
         List<String> dbNames = new ArrayList<>();
-        plugin.getClient().listDatabaseNames().forEach(new Consumer<String>() {
-          @Override
-          public void accept(String name) {
-            // 1. Schemas in drill are case insensitive and stored in lower case.
-            dbNames.add(name.toLowerCase());
-            /**
-             * 2. Support database name with capital letters.
-             * case 1: "show tables from mongo.HELLO", Should using the lower case name
-             *  to resolve the schema lookup in `CalciteSchema`.
-             * case 2: "select * from mongo.HEllO.myTable", Must be using origin name
-             *  to create `MongoScanSpec` and initial connection in `MongoRecordReader`.
-             */
-            schemaNameMap.put(name.toLowerCase(), name);
-          }
+        plugin.getClient().listDatabaseNames().forEach(name -> {
+          // 1. Schemas in drill are case insensitive and stored in lower case.
+          dbNames.add(name.toLowerCase());
+          /*
+           * 2. Support database name with capital letters.
+           * case 1: "show tables from mongo.HELLO", Should using the lower case name
+           *  to resolve the schema lookup in `CalciteSchema`.
+           * case 2: "select * from mongo.HEllO.myTable", Must be using origin name
+           *  to create `MongoScanSpec` and initial connection in `MongoRecordReader`.
+           */
+          schemaNameMap.put(name.toLowerCase(), name);
         });
         return dbNames;
       } catch (MongoException me) {
@@ -117,7 +110,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
   private class TableNameLoader extends CacheLoader<String, List<String>> {
 
     @Override
-    public List<String> load(String dbName) throws Exception {
+    public List<String> load(String dbName) {
       try {
         MongoDatabase db = plugin.getClient().getDatabase(schemaNameMap.get(dbName));
         List<String> collectionNames = new ArrayList<>();
@@ -134,7 +127,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     MongoSchema schema = new MongoSchema(getName());
     SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
@@ -145,7 +138,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
     private final Map<String, MongoDatabaseSchema> schemaMap = Maps.newHashMap();
 
     public MongoSchema(String name) {
-      super(ImmutableList.<String> of(), name);
+      super(Collections.emptyList(), name);
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
similarity index 96%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
rename to contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
index 2c2363c..1429b14 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
@@ -17,13 +17,14 @@
  */
 package org.apache.drill.exec.store.bson;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.ZoneOffset;
-import java.util.Arrays;
 
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -59,14 +60,14 @@ import org.junit.Test;
 public class TestBsonRecordReader extends BaseTest {
   private BufferAllocator allocator;
   private VectorContainerWriter writer;
-  private TestOutputMutator mutator;
+
   private BufferManager bufferManager;
   private BsonRecordReader bsonReader;
 
   @Before
   public void setUp() {
     allocator = new RootAllocator(Long.MAX_VALUE);
-    mutator = new TestOutputMutator(allocator);
+    TestOutputMutator mutator = new TestOutputMutator(allocator);
     writer = new VectorContainerWriter(mutator);
     bufferManager = new BufferManagerImpl(allocator);
     bsonReader = new BsonRecordReader(bufferManager.getManagedBuffer(1024), false, false);
@@ -134,7 +135,7 @@ public class TestBsonRecordReader extends BaseTest {
     bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
     SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
     byte[] readByteArray = mapReader.reader("_idKey").readByteArray();
-    assertTrue(Arrays.equals(value.getValue().toByteArray(), readByteArray));
+    assertArrayEquals(value.getValue().toByteArray(), readByteArray);
   }
 
   @Test
@@ -144,7 +145,7 @@ public class TestBsonRecordReader extends BaseTest {
     writer.reset();
     bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
     SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
-    assertEquals(null, mapReader.reader("nullKey").readObject());
+    assertNull(mapReader.reader("nullKey").readObject());
   }
 
   @Test
@@ -154,7 +155,7 @@ public class TestBsonRecordReader extends BaseTest {
     writer.reset();
     bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
     SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
-    assertEquals(12.35d, mapReader.reader("doubleKey").readDouble().doubleValue(), 0.00001);
+    assertEquals(12.35d, mapReader.reader("doubleKey").readDouble(), 0.00001);
   }
 
   @Test
@@ -251,11 +252,11 @@ public class TestBsonRecordReader extends BaseTest {
     writer.reset();
     bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
     SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
-    assertTrue(Arrays.equals(bytes, mapReader.reader("binaryKey").readByteArray()));
+    assertArrayEquals(bytes, mapReader.reader("binaryKey").readByteArray());
     assertEquals("binaryStringValue", mapReader.reader("binaryStringKey").readText().toString());
-    assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble().doubleValue(), 0);
+    assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble(), 0);
     FieldReader reader = mapReader.reader("bsonBoolean");
-    assertEquals(true, reader.readBoolean().booleanValue());
+    assertEquals(true, reader.readBoolean());
   }
 
   @Test
@@ -292,7 +293,7 @@ public class TestBsonRecordReader extends BaseTest {
     try {
       writer.close();
     } catch (Exception e) {
-
+      // noop
     }
 
     bufferManager.close();
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
index 14c4f56..2af1c3f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
@@ -18,32 +18,17 @@
 package org.apache.drill.exec.store.mongo;
 
 import com.mongodb.BasicDBObject;
-import com.mongodb.MongoClient;
-import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.IndexOptions;
 import com.mongodb.client.model.Indexes;
-import de.flapdoodle.embed.mongo.Command;
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.IMongoCmdOptions;
-import de.flapdoodle.embed.mongo.config.IMongodConfig;
-import de.flapdoodle.embed.mongo.config.IMongosConfig;
-import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.MongosConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.config.RuntimeConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Storage;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.mongo.tests.MongosSystemForTestFactory;
-import de.flapdoodle.embed.process.config.IRuntimeConfig;
-import de.flapdoodle.embed.process.runtime.Network;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.drill.test.BaseTest;
 import org.apache.hadoop.conf.Configuration;
 import org.bson.Document;
@@ -55,15 +40,19 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.Transferable;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URLEncoder;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
@@ -82,152 +71,145 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
   private static final Logger logger = LoggerFactory.getLogger(MongoTestSuite.class);
   protected static MongoClient mongoClient;
 
-  private static boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
-  private static boolean authEnabled = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.authEnabled", "false"));
+  private static final boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
   private static volatile String connectionURL = null;
-  private static volatile AtomicInteger initCount = new AtomicInteger(0);
+  private static final AtomicInteger initCount = new AtomicInteger(0);
+
+  private static ContainerManager containerManager;
 
   public static String getConnectionURL() {
     return connectionURL;
   }
 
-  private static class DistributedMode {
-    private static MongosSystemForTestFactory mongosTestFactory;
-
-    private static String setup() throws Exception {
-      // creating configServers
-      List<IMongodConfig> configServers = new ArrayList<>(1);
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_1_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_2_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_3_PORT));
-
-      // creating replicaSets
-      // A LinkedHashMap ensures that the config servers are started first.
-      Map<String, List<IMongodConfig>> replicaSets = new LinkedHashMap<>();
-
-      List<IMongodConfig> replicaSet1 = new ArrayList<>();
-      replicaSet1.add(crateIMongodConfig(MONGOD_1_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_2_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_3_PORT, false, REPLICA_SET_1_NAME));
-
-      List<IMongodConfig> replicaSet2 = new ArrayList<>();
-      replicaSet2.add(crateIMongodConfig(MONGOD_4_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_5_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_6_PORT, false, REPLICA_SET_2_NAME));
-
-      replicaSets.put(CONFIG_REPLICA_SET, configServers);
-      replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
-      replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
-
-      // create mongo shards
-      IMongosConfig mongosConfig = createIMongosConfig();
-      mongosTestFactory = new MongosSystemForTestFactory(mongosConfig, replicaSets, Lists.newArrayList(),
-          EMPLOYEE_DB, EMPINFO_COLLECTION,"employee_id");
-      try {
-        mongosTestFactory.start();
-        mongoClient = (MongoClient) mongosTestFactory.getMongo();
-      } catch (Throwable e) {
-        logger.error(" Error while starting sharded cluster. ", e);
-        throw new Exception(" Error while starting sharded cluster. ", e);
-      }
-      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
-      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
-      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+  private abstract static class ContainerManager {
+    protected static List<GenericContainer<?>> mongoContainers;
 
-      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
-      return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
-    }
+    public abstract String setup() throws Exception;
 
-    private static IMongodConfig crateConfigServerConfig(int configServerPort) throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, CONFIG_REPLICA_SET, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, configServerPort, Network.localhostIsIPv6()))
-          .replication(replication)
-          .shardServer(false)
-          .configServer(true).cmdOptions(cmdOptions).build();
+    public void cleanup() {
+      mongoContainers.forEach(GenericContainer::stop);
     }
 
-    private static IMongodConfig crateIMongodConfig(int mongodPort, boolean flag, String replicaName)
-        throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, replicaName, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .shardServer(true)
-          .net(new Net(LOCALHOST, mongodPort, Network.localhostIsIPv6()))
-          .configServer(flag).replication(replication).cmdOptions(cmdOptions)
-          .build();
+    public GenericContainer<?> getMasterContainer() {
+      return mongoContainers.iterator().next();
     }
+  }
 
-    private static IMongosConfig createIMongosConfig() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      return new MongosConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .replicaSet(CONFIG_REPLICA_SET)
-          .configDB(LOCALHOST + ":" + CONFIG_SERVER_1_PORT)
-          .cmdOptions(cmdOptions).build();
-    }
+  private static class DistributedMode extends ContainerManager {
 
-    private static void cleanup() {
-      if (mongosTestFactory != null) {
-        // ignoring exception because sometimes provided time isn't enough to stop mongod processes
-        try {
-            mongosTestFactory.stop();
-          } catch (IllegalStateException e) {
-            logger.warn("Failed to close all mongod processes during provided timeout", e);
-          }
-      }
+    @Override
+    public String setup() throws Exception {
+      Network network = Network.newNetwork();
+
+      mongoContainers = Stream.of("m1", "m2", "m3")
+          .map(host -> new GenericContainer<>("mongo:4.4.5")
+              .withNetwork(network)
+              .withNetworkAliases(host)
+              .withExposedPorts(MONGOS_PORT)
+              .withCommand(String.format("mongod --port %d --shardsvr --replSet rs0 --bind_ip localhost,%s", MONGOS_PORT, host)))
+          .collect(Collectors.toList());
+
+      String configServerHost = "m4";
+      GenericContainer<?> configServer = new GenericContainer<>("mongo:4.4.5")
+          .withNetwork(network)
+          .withNetworkAliases(configServerHost)
+          .withExposedPorts(MONGOS_PORT)
+          .withCommand(String.format("mongod --configsvr --port %s --replSet rs0conf --bind_ip localhost,%s", MONGOS_PORT, configServerHost));
+
+      configServer.start();
+
+      Container.ExecResult execResult = configServer.execInContainer("/bin/bash", "-c",
+          String.format("echo 'rs.initiate({_id: \"rs0conf\",configsvr: true, members: [{ _id : 0, host : \"%s:%2$s\" }]})' | mongo --port %2$s", configServerHost, MONGOS_PORT));
+
+      logger.info(execResult.toString());
+
+      String mongosHost = "m5";
+      GenericContainer<?> mongos = new GenericContainer<>("mongo:4.4.5")
+          .withNetwork(network)
+          .withNetworkAliases(mongosHost)
+          .withExposedPorts(MONGOS_PORT)
+          .withCommand(String.format("mongos --configdb rs0conf/%1$s:%2$s --bind_ip localhost,%3$s --port %2$s", configServerHost, MONGOS_PORT, mongosHost));
+
+      mongos.start();
+
+      mongoContainers.forEach(GenericContainer::start);
+
+      GenericContainer<?> master = getMasterContainer();
+
+      execResult = master.execInContainer("/bin/bash", "-c",
+          String.format("mongo --port %1$s --eval 'printjson(rs.initiate({_id:\"rs0\"," +
+              "members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"},{_id:2,host:\"m3:%1$s\"}]}))' --quiet", MONGOS_PORT));
+      logger.info(execResult.toString());
+
+      execResult = master.execInContainer("/bin/bash", "-c",
+          String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGOS_PORT));
+      logger.info(execResult.toString());
+
+      execResult = mongos.execInContainer("/bin/bash", "-c", "echo 'sh.addShard(\"rs0/m1\")' | mongo --port " + MONGOS_PORT);
+      logger.info(execResult.toString());
+
+      String replicaSetUrl = String.format("mongodb://%s:%s", mongos.getContainerIpAddress(), mongos.getMappedPort(MONGOS_PORT));
+
+      mongoClient = MongoClients.create(replicaSetUrl);
+
+      logger.info("Execute list shards.");
+      execResult = master.execInContainer("/bin/bash", "-c", "mongo --eval 'db.adminCommand({ listShards: 1 })' --port " + MONGOS_PORT);
+      logger.info(execResult.toString());
+
+      // Enabled sharding at database level
+      logger.info("Enabled sharding at database level");
+      execResult = mongos.execInContainer("/bin/bash", "-c", String.format("mongo --eval 'db.adminCommand( {\n" +
+          "   enableSharding: \"%s\"\n" +
+          "} )'", EMPLOYEE_DB));
+      logger.info(execResult.toString());
+
+      // Create index in sharded collection
+      logger.info("Create index in sharded collection");
+      MongoDatabase db = mongoClient.getDatabase(EMPLOYEE_DB);
+      db.getCollection(EMPINFO_COLLECTION).createIndex(Indexes.ascending("employee_id"));
+
+      // Shard the collection
+      logger.info("Shard the collection: {}.{}", EMPLOYEE_DB, EMPINFO_COLLECTION);
+      execResult = mongos.execInContainer("/bin/bash", "-c", String.format(
+          "echo 'sh.shardCollection(\"%s.%s\", {\"employee_id\" : 1})' | mongo ", EMPLOYEE_DB, EMPINFO_COLLECTION));
+      logger.info(execResult.toString());
+      createMongoUser();
+      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
+      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
+      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+
+      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+      return String.format("mongodb://%s:%s", LOCALHOST, mongos.getMappedPort(MONGOS_PORT));
     }
   }
 
-  private static class SingleMode {
+  public static class SingleMode extends ContainerManager {
 
-    private static MongodExecutable mongodExecutable;
-    private static MongodProcess mongod;
+    @Override
+    public String setup() throws IOException {
+      mongoContainers = Collections.singletonList(new GenericContainer<>("mongo:4.4.5")
+          .withNetwork(Network.SHARED)
+          .withNetworkAliases("M1")
+          .withExposedPorts(MONGOS_PORT)
+          .withCommand("--replSet rs0 --bind_ip localhost,M1"));
 
-    private static String setup() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder().verbose(false)
-          .enableAuth(authEnabled).build();
+      mongoContainers.forEach(GenericContainer::start);
+      GenericContainer<?> master = getMasterContainer();
 
-      IMongodConfig mongodConfig = new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .cmdOptions(cmdOptions).build();
+      try {
+        master.execInContainer("/bin/bash", "-c",
+            "mongo --eval 'printjson(rs.initiate({_id:\"rs0\","
+                + "members:[{_id:0,host:\"M1:27017\"}]}))' "
+                + "--quiet");
+        master.execInContainer("/bin/bash", "-c",
+            "until mongo --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;"
+                + "do sleep 1;done");
+      } catch (Exception e) {
+        throw new IllegalStateException("Failed to initiate rs.", e);
+      }
 
-      // Configure to write Mongo message to the log. Change this to
-      // defaults() if needed for debugging; will write to the console instead.
-      IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder().defaultsWithLogger(
-          Command.MongoD, logger).build();
-      mongodExecutable = MongodStarter.getInstance(runtimeConfig).prepare(
-          mongodConfig);
-      mongod = mongodExecutable.start();
-      mongoClient = new MongoClient(new ServerAddress(LOCALHOST, MONGOS_PORT));
+      String connectionString = String.format("mongodb://%s:%d", master.getContainerIpAddress(), master.getFirstMappedPort());
+      mongoClient = MongoClients.create(connectionString);
 
       createMongoUser();
       createDbAndCollections(EMPLOYEE_DB, EMPINFO_COLLECTION, "employee_id");
@@ -235,16 +217,7 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
       createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
       createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
 
-      return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
-    }
-
-    private static void cleanup() {
-      if (mongod != null) {
-        mongod.stop();
-      }
-      if (mongodExecutable != null) {
-        mongodExecutable.stop();
-      }
+      return connectionString;
     }
   }
 
@@ -254,17 +227,22 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
       if (initCount.get() == 0) {
         if (distMode) {
           logger.info("Executing tests in distributed mode");
-          connectionURL = DistributedMode.setup();
+          containerManager = new DistributedMode();
         } else {
           logger.info("Executing tests in single mode");
-          connectionURL = SingleMode.setup();
+          containerManager = new SingleMode();
         }
+        connectionURL = containerManager.setup();
         // ToDo DRILL-7269: fix the way how data are imported for the sharded mongo cluster
-        TestTableGenerator.importData(EMPLOYEE_DB, EMPINFO_COLLECTION, EMP_DATA);
-        TestTableGenerator.importData(EMPLOYEE_DB, SCHEMA_CHANGE_COLLECTION, SCHEMA_CHANGE_DATA);
-        TestTableGenerator.importData(DONUTS_DB, DONUTS_COLLECTION, DONUTS_DATA);
-        TestTableGenerator.importData(DATATYPE_DB, DATATYPE_COLLECTION, DATATYPE_DATA);
-        TestTableGenerator.importData(ISSUE7820_DB, ISSUE7820_COLLECTION, EMP_DATA);
+        containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(EMP_DATA).toURI()), Charsets.UTF_8).read().getBytes()), EMP_DATA);
+        containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(SCHEMA_CHANGE_DATA).toURI()), Charsets.UTF_8).read().getBytes()), SCHEMA_CHANGE_DATA);
+        containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(DONUTS_DATA).toURI()), Charsets.UTF_8).read().getBytes()), DONUTS_DATA);
+        containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(DATATYPE_DATA).toURI()), Charsets.UTF_8).read().getBytes()), DATATYPE_DATA);
+        TestTableGenerator.importData(containerManager.getMasterContainer(), EMPLOYEE_DB, EMPINFO_COLLECTION, EMP_DATA);
+        TestTableGenerator.importData(containerManager.getMasterContainer(), EMPLOYEE_DB, SCHEMA_CHANGE_COLLECTION, SCHEMA_CHANGE_DATA);
+        TestTableGenerator.importData(containerManager.getMasterContainer(), DONUTS_DB, DONUTS_COLLECTION, DONUTS_DATA);
+        TestTableGenerator.importData(containerManager.getMasterContainer(), DATATYPE_DB, DATATYPE_COLLECTION, DATATYPE_DATA);
+        TestTableGenerator.importData(containerManager.getMasterContainer(), ISSUE7820_DB, ISSUE7820_COLLECTION, EMP_DATA);
       }
       initCount.incrementAndGet();
     }
@@ -316,19 +294,15 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
       if (initCount.decrementAndGet() == 0) {
         try {
           if (mongoClient != null) {
-            mongoClient.dropDatabase(EMPLOYEE_DB);
-            mongoClient.dropDatabase(DATATYPE_DB);
-            mongoClient.dropDatabase(DONUTS_DB);
+            mongoClient.getDatabase(EMPLOYEE_DB).drop();
+            mongoClient.getDatabase(DATATYPE_DB).drop();
+            mongoClient.getDatabase(DONUTS_DB).drop();
           }
         } finally {
           if (mongoClient != null) {
             mongoClient.close();
           }
-          if (distMode) {
-            DistributedMode.cleanup();
-          } else {
-            SingleMode.cleanup();
-          }
+          containerManager.cleanup();
         }
       }
     }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
index 673a36d..d1fd63f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.mongo;
 
 import com.mongodb.MongoCredential;
+import com.mongodb.client.internal.MongoClientImpl;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -25,9 +26,9 @@ import org.apache.drill.test.BaseTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.List;
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 @Category({MongoStorageTest.class})
 public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
@@ -36,12 +37,12 @@ public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
     MongoStoragePlugin plugin = new MongoStoragePlugin(
         new MongoStoragePluginConfig(connection, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
         null, name);
-    List<MongoCredential> creds = plugin.getClient().getCredentialsList();
+    MongoClientImpl client = (MongoClientImpl) plugin.getClient();
+    MongoCredential cred = client.getSettings().getCredential();
     if (expectedUserName == null) {
-      assertEquals(0, creds.size());
+      assertNull(cred);
     } else {
-      assertEquals(1, creds.size());
-      MongoCredential cred = creds.get(0);
+      assertNotNull(cred);
       assertEquals(expectedUserName, cred.getUserName());
       assertEquals(expectedPassword, new String(cred.getPassword()));
     }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
index 6051fcd..c846343 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
@@ -17,64 +17,25 @@
  */
 package org.apache.drill.exec.store.mongo;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
 
-import de.flapdoodle.embed.mongo.Command;
-import de.flapdoodle.embed.mongo.MongoImportExecutable;
-import de.flapdoodle.embed.mongo.MongoImportProcess;
-import de.flapdoodle.embed.mongo.MongoImportStarter;
-import de.flapdoodle.embed.mongo.config.IMongoImportConfig;
-import de.flapdoodle.embed.mongo.config.MongoImportConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.config.RuntimeConfigBuilder;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.config.IRuntimeConfig;
-import de.flapdoodle.embed.process.runtime.Network;
+import java.io.IOException;
 
 public class TestTableGenerator implements MongoTestConstants {
 
   private static final Logger logger = LoggerFactory
       .getLogger(TestTableGenerator.class);
 
-  public static void importData(String dbName, String collectionName,
-                                String fileName) throws InterruptedException, IOException, URISyntaxException {
-    File jsonFile = new File(Resources.getResource(fileName).toURI());
-    generateTable(dbName, collectionName, jsonFile.getAbsolutePath(), true, true, false);
-  }
-
-  public static void generateTable(String dbName, String collection,
-      String jsonFile, Boolean jsonArray, Boolean upsert, Boolean drop)
-      throws InterruptedException, IOException {
-    logger.info("Started importing file {} into collection {} ", jsonFile,
-        collection);
-    IMongoImportConfig mongoImportConfig = new MongoImportConfigBuilder()
-        .version(Version.Main.V3_4)
-        .net(new Net(MONGOS_PORT, Network.localhostIsIPv6())).db(dbName)
-        .collection(collection).upsert(upsert).dropCollection(drop)
-        .jsonArray(jsonArray).importFile(jsonFile).build();
-    // Configure to write Mongo message to the log. Change this to
-    // .getDefaultInstance() if needed for debugging; will write to
-    // the console instead.
-    IRuntimeConfig rtConfig = new RuntimeConfigBuilder()
-        .defaultsWithLogger(Command.MongoImport, logger)
-        .daemonProcess(false)
-        .build();
-    MongoImportExecutable importExecutable = MongoImportStarter
-        .getInstance(rtConfig).prepare(mongoImportConfig);
-    MongoImportProcess importProcess = importExecutable.start();
-
-    // import is in a separate process, we should wait until the process exit
-    while (importProcess.isProcessRunning()) {
-        Thread.sleep(1000);
-    }
+  public static void importData(GenericContainer<?> mongo, String dbName, String collectionName,
+                                String fileName) throws InterruptedException, IOException {
+    Container.ExecResult execResult = mongo.execInContainer("/bin/bash", "-c",
+        "mongoimport --db " + dbName + " --collection " + collectionName + " --jsonArray --upsert --file " + fileName);
+    logger.info(execResult.toString());
 
-    logger.info("Imported file {} into collection {} ", jsonFile, collection);
+    logger.info("Imported file {} into collection {} ", fileName, collectionName);
   }
 
 }
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index a08e172..65d7db4 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -198,11 +198,6 @@
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.mongodb</groupId>
-      <artifactId>mongo-java-driver</artifactId>
-      <version>3.12.7</version>
-    </dependency>
-    <dependency>
       <groupId>com.fasterxml.jackson.module</groupId>
       <artifactId>jackson-module-afterburner</artifactId>
       <version>${jackson.version}</version>