You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/04/28 11:49:17 UTC
[drill] branch master updated: DRILL-7903: Update mongo driver from
3.12 to 4.2 (#2201)
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new ed0c304 DRILL-7903: Update mongo driver from 3.12 to 4.2 (#2201)
ed0c304 is described below
commit ed0c304389f880769066fea3ab8d55f7d1c8b5de
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Apr 28 14:49:04 2021 +0300
DRILL-7903: Update mongo driver from 3.12 to 4.2 (#2201)
---
contrib/storage-mongo/pom.xml | 10 +-
.../drill/exec/store/bson/BsonRecordReader.java | 0
.../drill/exec/store/mongo/MongoGroupScan.java | 92 ++----
.../drill/exec/store/mongo/MongoRecordReader.java | 6 +-
.../drill/exec/store/mongo/MongoStoragePlugin.java | 54 ++--
.../exec/store/mongo/MongoStoragePluginConfig.java | 18 +-
.../mongo/config/MongoPersistentStoreProvider.java | 20 +-
.../store/mongo/schema/MongoSchemaFactory.java | 47 ++-
.../exec/store/bson/TestBsonRecordReader.java | 21 +-
.../drill/exec/store/mongo/MongoTestSuite.java | 322 ++++++++++-----------
...TestMongoStoragePluginUsesCredentialsStore.java | 13 +-
.../drill/exec/store/mongo/TestTableGenerator.java | 57 +---
exec/java-exec/pom.xml | 5 -
13 files changed, 275 insertions(+), 390 deletions(-)
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 9746615..fdd6204 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -44,8 +44,8 @@
<dependency>
<groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.12.8</version>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>4.2.3</version>
</dependency>
<!-- Test dependency -->
@@ -64,9 +64,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>de.flapdoodle.embed</groupId>
- <artifactId>de.flapdoodle.embed.mongo</artifactId>
- <version>2.2.0</version>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <version>1.15.2</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
rename to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 97fd312..ef3041b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -19,13 +19,14 @@ package org.apache.drill.exec.store.mongo;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
@@ -36,13 +37,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.mongodb.client.MongoClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -68,7 +68,6 @@ import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import com.mongodb.MongoClient;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
@@ -84,16 +83,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
- @Override
- public int compare(List<MongoSubScanSpec> list1,
- List<MongoSubScanSpec> list2) {
- return list1.size() - list2.size();
- }
- };
+ private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections
- .reverseOrder(LIST_SIZE_COMPARATOR);
+ private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
private MongoStoragePlugin storagePlugin;
@@ -124,8 +116,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry,
- @JsonProperty("maxRecords") int maxRecords) throws IOException,
- ExecutionSetupException {
+ @JsonProperty("maxRecords") int maxRecords) throws IOException {
this(userName,
pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
scanSpec, columns, maxRecords);
@@ -138,7 +129,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
this.storagePluginConfig = storagePlugin.getConfig();
this.scanSpec = scanSpec;
this.columns = columns;
- this.storagePluginConfig.getConnection();
this.maxRecords = maxRecords;
init();
}
@@ -175,11 +165,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
private boolean isShardedCluster(MongoClient client) {
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
String msg = db.runCommand(new Document("isMaster", 1)).getString("msg");
- return msg == null ? false : msg.equals("isdbgrid");
+ return msg != null && msg.equals("isdbgrid");
}
@SuppressWarnings({ "rawtypes" })
- private void init() throws IOException {
+ private void init() {
List<String> h = storagePluginConfig.getHosts();
List<ServerAddress> addresses = Lists.newArrayList();
@@ -216,33 +206,26 @@ public class MongoGroupScan extends AbstractGroupScan implements
while (iterator.hasNext()) {
Document chunkObj = iterator.next();
String shardName = (String) chunkObj.get(SHARD);
- String chunkId = (String) chunkObj.get(ID);
+ // creates hexadecimal string representation of ObjectId
+ String chunkId = chunkObj.get(ID).toString();
filter = new Document(ID, shardName);
FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
- MongoCursor<Document> hostIterator = hostCursor.iterator();
- while (hostIterator.hasNext()) {
- Document hostObj = hostIterator.next();
+ for (Document hostObj : hostCursor) {
String hostEntry = (String) hostObj.get(HOST);
String[] tagAndHost = StringUtils.split(hostEntry, '/');
String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
- List<String> chunkHosts = Arrays.asList(hosts);
- Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses), chunkHosts);
+ Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses));
if (addressList == null) {
addressList = Sets.newHashSet();
- for (String host : chunkHosts) {
+ for (String host : hosts) {
addressList.add(new ServerAddress(host));
}
}
chunksMapping.put(chunkId, addressList);
ServerAddress address = addressList.iterator().next();
- List<ChunkInfo> chunkList = chunksInverseMapping.get(address
- .getHost());
- if (chunkList == null) {
- chunkList = Lists.newArrayList();
- chunksInverseMapping.put(address.getHost(), chunkList);
- }
- List<String> chunkHostsList = new ArrayList<String>();
+ List<ChunkInfo> chunkList = chunksInverseMapping.computeIfAbsent(address.getHost(), k -> new ArrayList<>());
+ List<String> chunkHostsList = new ArrayList<>();
for (ServerAddress serverAddr : addressList) {
chunkHostsList.add(serverAddr.toString());
}
@@ -277,7 +260,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
// In a sharded environment, if a collection doesn't have any chunks, it is considered as an
// unsharded collection and it will be stored in the primary shard of that database.
if (!hasChunks) {
- handleUnshardedCollection(getPrimaryShardInfo(client));
+ handleUnshardedCollection(getPrimaryShardInfo());
}
} else {
handleUnshardedCollection(storagePluginConfig.getHosts());
@@ -297,21 +280,20 @@ public class MongoGroupScan extends AbstractGroupScan implements
String host = hosts.get(0);
ServerAddress address = new ServerAddress(host);
ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
- chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
- chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+ chunkInfo.setMinFilters(Collections.emptyMap());
+ chunkInfo.setMaxFilters(Collections.emptyMap());
List<ChunkInfo> chunksList = Lists.newArrayList();
chunksList.add(chunkInfo);
chunksInverseMapping.put(address.getHost(), chunksList);
}
- private List<String> getPrimaryShardInfo(MongoClient client) {
+ private List<String> getPrimaryShardInfo() {
MongoDatabase database = storagePlugin.getClient().getDatabase(CONFIG);
//Identify the primary shard of the queried database.
MongoCollection<Document> collection = database.getCollection(DATABASES);
Bson filter = new Document(ID, this.scanSpec.getDbName());
Bson projection = new Document(PRIMARY, select);
- Document document = collection.find(filter).projection(projection).first();
- Preconditions.checkNotNull(document);
+ Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
String shardName = document.getString(PRIMARY);
Preconditions.checkNotNull(shardName);
@@ -319,8 +301,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
filter = new Document(ID, shardName);
projection = new Document(HOST, select);
- Document hostInfo = shardsCol.find(filter).projection(projection).first();
- Preconditions.checkNotNull(hostInfo);
+ Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
String hostEntry = hostInfo.getString(HOST);
Preconditions.checkNotNull(hostEntry);
@@ -331,10 +312,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
@SuppressWarnings("unchecked")
- private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) {
+ private Set<ServerAddress> getPreferredHosts(MongoClient client) {
Set<ServerAddress> addressList = Sets.newHashSet();
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
- ReadPreference readPreference = client.getReadPreference();
+ ReadPreference readPreference = db.getReadPreference();
Document command = db.runCommand(new Document("isMaster", 1));
final String primaryHost = command.getString("primary");
@@ -390,8 +371,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
@Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints)
- throws PhysicalOperatorSetupException {
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
logger.debug("Incoming endpoints :" + endpoints);
watch.reset();
watch.start();
@@ -412,14 +392,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
for (int i = 0; i < numSlots; ++i) {
- endpointFragmentMapping.put(i, new ArrayList<MongoSubScanSpec>(
+ endpointFragmentMapping.put(i, new ArrayList<>(
maxPerEndpointSlot));
String hostname = endpoints.get(i).getAddress();
- Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
- if (hostIndexQueue == null) {
- hostIndexQueue = Lists.newLinkedList();
- endpointHostIndexListMap.put(hostname, hostIndexQueue);
- }
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.computeIfAbsent(hostname, k -> new LinkedList<>());
hostIndexQueue.add(i);
}
@@ -442,9 +418,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
}
- PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+ PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+ PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR_REV);
for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
if (listOfScan.size() < minPerEndpointSlot) {
@@ -483,7 +459,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
- MongoSubScanSpec subScanSpec = new MongoSubScanSpec()
+ return new MongoSubScanSpec()
.setDbName(scanSpec.getDbName())
.setCollectionName(scanSpec.getCollectionName())
.setHosts(chunkInfo.getChunkLocList())
@@ -491,12 +467,10 @@ public class MongoGroupScan extends AbstractGroupScan implements
.setMaxFilters(chunkInfo.getMaxFilters())
.setMaxRecords(maxRecords)
.setFilter(scanSpec.getFilters());
- return subScanSpec;
}
@Override
- public MongoSubScan getSpecificScan(int minorFragmentId)
- throws ExecutionSetupException {
+ public MongoSubScan getSpecificScan(int minorFragmentId) {
return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
endpointFragmentMapping.get(minorFragmentId), columns);
}
@@ -530,8 +504,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
if (recordCount != 0) {
//toJson should use client's codec, otherwise toJson could fail on
// some types not known to DocumentCodec, e.g. DBRef.
- final DocumentCodec codec =
- new DocumentCodec(client.getMongoClientOptions().getCodecRegistry(), new BsonTypeClassMap());
+ DocumentCodec codec = new DocumentCodec(db.getCodecRegistry(), new BsonTypeClassMap());
String json = collection.find().first().toJson(codec);
approxDiskCost = json.getBytes().length * recordCount;
}
@@ -542,8 +515,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
- throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MongoGroupScan(this);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 9d8b5bb..b06fe36 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -47,7 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import com.mongodb.MongoClient;
+import com.mongodb.client.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@@ -82,7 +82,7 @@ public class MongoRecordReader extends AbstractRecordReader {
fields = new Document();
// exclude _id field, if not mentioned by user.
- fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+ fields.put(DrillMongoConstants.ID, 0);
setColumns(projectedColumns);
fragmentContext = context;
this.plugin = plugin;
@@ -107,7 +107,7 @@ public class MongoRecordReader extends AbstractRecordReader {
for (SchemaPath column : projectedColumns) {
String fieldName = column.getRootSegment().getPath();
transformed.add(column);
- this.fields.put(fieldName, Integer.valueOf(1));
+ this.fields.put(fieldName, 1);
}
} else {
// Tale all the fields including the _id
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 362ac51..da55907 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.store.mongo;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClients;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLEncoder;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MongoStoragePlugin extends AbstractStoragePlugin {
@@ -60,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
private final MongoStoragePluginConfig mongoConfig;
private final MongoSchemaFactory schemaFactory;
private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
- private final MongoClientURI clientURI;
+ private final ConnectionString clientURI;
public MongoStoragePlugin(
MongoStoragePluginConfig mongoConfig,
@@ -69,16 +72,17 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
super(context, name);
this.mongoConfig = mongoConfig;
String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
- this.clientURI = new MongoClientURI(connection);
+ this.clientURI = new ConnectionString(connection);
this.addressClientMap = CacheBuilder.newBuilder()
- .expireAfterAccess(24, TimeUnit.HOURS)
- .removalListener(new AddressCloser()).build();
+ .expireAfterAccess(24, TimeUnit.HOURS)
+ .removalListener(new AddressCloser())
+ .build();
this.schemaFactory = new MongoSchemaFactory(this, name);
}
private String addCredentialsFromCredentialsProvider(String connection, String name) {
- MongoClientURI parsed = new MongoClientURI(connection);
- if (parsed.getCredentials() == null) {
+ ConnectionString parsed = new ConnectionString(connection);
+ if (parsed.getCredential() == null) {
UsernamePasswordCredentials credentials = getUsernamePasswordCredentials(name);
try {
// The default connection has the name "mongo" but multiple connections can be added;
@@ -148,10 +152,6 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
}
- public MongoClient getClient(String host) {
- return getClient(Collections.singletonList(new ServerAddress(host)));
- }
-
public MongoClient getClient() {
List<String> hosts = clientURI.getHosts();
List<ServerAddress> addresses = Lists.newArrayList();
@@ -163,22 +163,24 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
public synchronized MongoClient getClient(List<ServerAddress> addresses) {
// Take the first replica from the replicated servers
- final ServerAddress serverAddress = addresses.get(0);
- final MongoCredential credential = clientURI.getCredentials();
+ ServerAddress serverAddress = addresses.get(0);
+ MongoCredential credential = clientURI.getCredential();
String userName = credential == null ? null : credential.getUserName();
MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
- MongoClient client = addressClientMap.getIfPresent(key);
- if (client == null) {
- if (credential != null) {
- client = new MongoClient(addresses, credential, clientURI.getOptions());
- } else {
- client = new MongoClient(addresses, clientURI.getOptions());
- }
- addressClientMap.put(key, client);
- logger.debug("Created connection to {}.", key.toString());
- logger.debug("Number of open connections {}.", addressClientMap.size());
+ try {
+ return addressClientMap.get(key, () -> {
+ logger.info("Created connection to {}.", key);
+ logger.info("Number of open connections {}.", addressClientMap.size());
+ MongoClientSettings.Builder settings = MongoClientSettings.builder()
+ .applyToClusterSettings(builder -> builder.hosts(addresses));
+ if (credential != null) {
+ settings.credential(credential);
+ }
+ return MongoClients.create(settings.build());
+ });
+ } catch (ExecutionException e) {
+ throw new DrillRuntimeException(e);
}
- return client;
}
@Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index 77bca4c..2ceb0d8 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -23,9 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientURI;
-import com.mongodb.MongoCredential;
+import com.mongodb.ConnectionString;
import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
import org.apache.drill.common.logical.security.CredentialsProvider;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -38,14 +36,14 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
private final String connection;
@JsonIgnore
- private final MongoClientURI clientURI;
+ private final ConnectionString clientURI;
@JsonCreator
public MongoStoragePluginConfig(@JsonProperty("connection") String connection,
@JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
super(getCredentialsProvider(credentialsProvider), credentialsProvider == null);
this.connection = connection;
- this.clientURI = new MongoClientURI(connection);
+ this.clientURI = new ConnectionString(connection);
}
@Override
@@ -66,16 +64,6 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
}
@JsonIgnore
- public MongoCredential getMongoCredentials() {
- return clientURI.getCredentials();
- }
-
- @JsonIgnore
- public MongoClientOptions getMongoOptions() {
- return clientURI.getOptions();
- }
-
- @JsonIgnore
public List<String> getHosts() {
return clientURI.getHosts();
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
index 1a082ac..80d0465 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
@@ -18,8 +18,11 @@
package org.apache.drill.exec.store.mongo.config;
import java.io.IOException;
+import java.util.Objects;
-import org.apache.drill.exec.exception.StoreException;
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
import org.apache.drill.exec.store.mongo.DrillMongoConstants;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
@@ -28,8 +31,6 @@ import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvide
import org.bson.Document;
import org.bson.conversions.Bson;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -37,9 +38,6 @@ import com.mongodb.client.model.Indexes;
public class MongoPersistentStoreProvider extends BasePersistentStoreProvider {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(MongoPersistentStoreProvider.class);
-
static final String pKey = "pKey";
private MongoClient client;
@@ -48,16 +46,16 @@ public class MongoPersistentStoreProvider extends BasePersistentStoreProvider {
private final String mongoURL;
- public MongoPersistentStoreProvider(PersistentStoreRegistry registry) throws StoreException {
+ public MongoPersistentStoreProvider(PersistentStoreRegistry<?> registry) {
mongoURL = registry.getConfig().getString(DrillMongoConstants.SYS_STORE_PROVIDER_MONGO_URL);
}
@Override
public void start() throws IOException {
- MongoClientURI clientURI = new MongoClientURI(mongoURL);
- client = new MongoClient(clientURI);
- MongoDatabase db = client.getDatabase(clientURI.getDatabase());
- collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED);
+ ConnectionString clientURI = new ConnectionString(mongoURL);
+ client = MongoClients.create(clientURI);
+ MongoDatabase db = client.getDatabase(Objects.requireNonNull(clientURI.getDatabase()));
+ collection = db.getCollection(Objects.requireNonNull(clientURI.getCollection())).withWriteConcern(WriteConcern.JOURNALED);
Bson index = Indexes.ascending(pKey);
collection.createIndex(index);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index f5623d8..cc071a4 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.mongo.schema;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -26,11 +25,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
@@ -45,7 +42,6 @@ import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import com.mongodb.MongoException;
@@ -57,15 +53,15 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
private static final String DATABASES = "databases";
- private LoadingCache<String, List<String>> databases;
- private LoadingCache<String, List<String>> tableNameLoader;
- private Map<String, String> schemaNameMap;
+ private final LoadingCache<String, List<String>> databases;
+ private final LoadingCache<String, List<String>> tableNameLoader;
+ private final Map<String, String> schemaNameMap;
private final MongoStoragePlugin plugin;
- public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+ public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) {
super(schemaName);
this.plugin = plugin;
- this.schemaNameMap = new HashMap<String, String>();
+ this.schemaNameMap = new HashMap<>();
databases = CacheBuilder //
.newBuilder() //
@@ -81,26 +77,23 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
private class DatabaseLoader extends CacheLoader<String, List<String>> {
@Override
- public List<String> load(String key) throws Exception {
+ public List<String> load(String key) {
if (!DATABASES.equals(key)) {
throw new UnsupportedOperationException();
}
try {
List<String> dbNames = new ArrayList<>();
- plugin.getClient().listDatabaseNames().forEach(new Consumer<String>() {
- @Override
- public void accept(String name) {
- // 1. Schemas in drill are case insensitive and stored in lower case.
- dbNames.add(name.toLowerCase());
- /**
- * 2. Support database name with capital letters.
- * case 1: "show tables from mongo.HELLO", Should using the lower case name
- * to resolve the schema lookup in `CalciteSchema`.
- * case 2: "select * from mongo.HEllO.myTable", Must be using origin name
- * to create `MongoScanSpec` and initial connection in `MongoRecordReader`.
- */
- schemaNameMap.put(name.toLowerCase(), name);
- }
+ plugin.getClient().listDatabaseNames().forEach(name -> {
+ // 1. Schemas in drill are case insensitive and stored in lower case.
+ dbNames.add(name.toLowerCase());
+ /*
+ * 2. Support database name with capital letters.
+ * case 1: "show tables from mongo.HELLO", Should using the lower case name
+ * to resolve the schema lookup in `CalciteSchema`.
+ * case 2: "select * from mongo.HEllO.myTable", Must be using origin name
+ * to create `MongoScanSpec` and initial connection in `MongoRecordReader`.
+ */
+ schemaNameMap.put(name.toLowerCase(), name);
});
return dbNames;
} catch (MongoException me) {
@@ -117,7 +110,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
private class TableNameLoader extends CacheLoader<String, List<String>> {
@Override
- public List<String> load(String dbName) throws Exception {
+ public List<String> load(String dbName) {
try {
MongoDatabase db = plugin.getClient().getDatabase(schemaNameMap.get(dbName));
List<String> collectionNames = new ArrayList<>();
@@ -134,7 +127,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
}
@Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
MongoSchema schema = new MongoSchema(getName());
SchemaPlus hPlus = parent.add(getName(), schema);
schema.setHolder(hPlus);
@@ -145,7 +138,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
private final Map<String, MongoDatabaseSchema> schemaMap = Maps.newHashMap();
public MongoSchema(String name) {
- super(ImmutableList.<String> of(), name);
+ super(Collections.emptyList(), name);
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
similarity index 96%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
rename to contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
index 2c2363c..1429b14 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
@@ -17,13 +17,14 @@
*/
package org.apache.drill.exec.store.bson;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.ZoneOffset;
-import java.util.Arrays;
import org.apache.drill.exec.memory.RootAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -59,14 +60,14 @@ import org.junit.Test;
public class TestBsonRecordReader extends BaseTest {
private BufferAllocator allocator;
private VectorContainerWriter writer;
- private TestOutputMutator mutator;
+
private BufferManager bufferManager;
private BsonRecordReader bsonReader;
@Before
public void setUp() {
allocator = new RootAllocator(Long.MAX_VALUE);
- mutator = new TestOutputMutator(allocator);
+ TestOutputMutator mutator = new TestOutputMutator(allocator);
writer = new VectorContainerWriter(mutator);
bufferManager = new BufferManagerImpl(allocator);
bsonReader = new BsonRecordReader(bufferManager.getManagedBuffer(1024), false, false);
@@ -134,7 +135,7 @@ public class TestBsonRecordReader extends BaseTest {
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
byte[] readByteArray = mapReader.reader("_idKey").readByteArray();
- assertTrue(Arrays.equals(value.getValue().toByteArray(), readByteArray));
+ assertArrayEquals(value.getValue().toByteArray(), readByteArray);
}
@Test
@@ -144,7 +145,7 @@ public class TestBsonRecordReader extends BaseTest {
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
- assertEquals(null, mapReader.reader("nullKey").readObject());
+ assertNull(mapReader.reader("nullKey").readObject());
}
@Test
@@ -154,7 +155,7 @@ public class TestBsonRecordReader extends BaseTest {
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
- assertEquals(12.35d, mapReader.reader("doubleKey").readDouble().doubleValue(), 0.00001);
+ assertEquals(12.35d, mapReader.reader("doubleKey").readDouble(), 0.00001);
}
@Test
@@ -251,11 +252,11 @@ public class TestBsonRecordReader extends BaseTest {
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
- assertTrue(Arrays.equals(bytes, mapReader.reader("binaryKey").readByteArray()));
+ assertArrayEquals(bytes, mapReader.reader("binaryKey").readByteArray());
assertEquals("binaryStringValue", mapReader.reader("binaryStringKey").readText().toString());
- assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble().doubleValue(), 0);
+ assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble(), 0);
FieldReader reader = mapReader.reader("bsonBoolean");
- assertEquals(true, reader.readBoolean().booleanValue());
+ assertEquals(true, reader.readBoolean());
}
@Test
@@ -292,7 +293,7 @@ public class TestBsonRecordReader extends BaseTest {
try {
writer.close();
} catch (Exception e) {
-
+ // noop
}
bufferManager.close();
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
index 14c4f56..2af1c3f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
@@ -18,32 +18,17 @@
package org.apache.drill.exec.store.mongo;
import com.mongodb.BasicDBObject;
-import com.mongodb.MongoClient;
-import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
-import de.flapdoodle.embed.mongo.Command;
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.IMongoCmdOptions;
-import de.flapdoodle.embed.mongo.config.IMongodConfig;
-import de.flapdoodle.embed.mongo.config.IMongosConfig;
-import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.MongosConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.config.RuntimeConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Storage;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.mongo.tests.MongosSystemForTestFactory;
-import de.flapdoodle.embed.process.config.IRuntimeConfig;
-import de.flapdoodle.embed.process.runtime.Network;
import org.apache.drill.categories.MongoStorageTest;
import org.apache.drill.categories.SlowTest;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.apache.drill.test.BaseTest;
import org.apache.hadoop.conf.Configuration;
import org.bson.Document;
@@ -55,15 +40,19 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.Transferable;
+import java.io.File;
import java.io.IOException;
import java.net.URLEncoder;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
@RunWith(Suite.class)
@Suite.SuiteClasses({
@@ -82,152 +71,145 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
private static final Logger logger = LoggerFactory.getLogger(MongoTestSuite.class);
protected static MongoClient mongoClient;
- private static boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
- private static boolean authEnabled = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.authEnabled", "false"));
+ private static final boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
private static volatile String connectionURL = null;
- private static volatile AtomicInteger initCount = new AtomicInteger(0);
+ private static final AtomicInteger initCount = new AtomicInteger(0);
+
+ private static ContainerManager containerManager;
public static String getConnectionURL() {
return connectionURL;
}
- private static class DistributedMode {
- private static MongosSystemForTestFactory mongosTestFactory;
-
- private static String setup() throws Exception {
- // creating configServers
- List<IMongodConfig> configServers = new ArrayList<>(1);
- configServers.add(crateConfigServerConfig(CONFIG_SERVER_1_PORT));
- configServers.add(crateConfigServerConfig(CONFIG_SERVER_2_PORT));
- configServers.add(crateConfigServerConfig(CONFIG_SERVER_3_PORT));
-
- // creating replicaSets
- // A LinkedHashMap ensures that the config servers are started first.
- Map<String, List<IMongodConfig>> replicaSets = new LinkedHashMap<>();
-
- List<IMongodConfig> replicaSet1 = new ArrayList<>();
- replicaSet1.add(crateIMongodConfig(MONGOD_1_PORT, false, REPLICA_SET_1_NAME));
- replicaSet1.add(crateIMongodConfig(MONGOD_2_PORT, false, REPLICA_SET_1_NAME));
- replicaSet1.add(crateIMongodConfig(MONGOD_3_PORT, false, REPLICA_SET_1_NAME));
-
- List<IMongodConfig> replicaSet2 = new ArrayList<>();
- replicaSet2.add(crateIMongodConfig(MONGOD_4_PORT, false, REPLICA_SET_2_NAME));
- replicaSet2.add(crateIMongodConfig(MONGOD_5_PORT, false, REPLICA_SET_2_NAME));
- replicaSet2.add(crateIMongodConfig(MONGOD_6_PORT, false, REPLICA_SET_2_NAME));
-
- replicaSets.put(CONFIG_REPLICA_SET, configServers);
- replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
- replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
-
- // create mongo shards
- IMongosConfig mongosConfig = createIMongosConfig();
- mongosTestFactory = new MongosSystemForTestFactory(mongosConfig, replicaSets, Lists.newArrayList(),
- EMPLOYEE_DB, EMPINFO_COLLECTION,"employee_id");
- try {
- mongosTestFactory.start();
- mongoClient = (MongoClient) mongosTestFactory.getMongo();
- } catch (Throwable e) {
- logger.error(" Error while starting sharded cluster. ", e);
- throw new Exception(" Error while starting sharded cluster. ", e);
- }
- createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
- createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
- createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+ private abstract static class ContainerManager {
+ protected static List<GenericContainer<?>> mongoContainers;
- // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
- return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
- }
+ public abstract String setup() throws Exception;
- private static IMongodConfig crateConfigServerConfig(int configServerPort) throws IOException {
- IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
- .useNoPrealloc(false)
- .useSmallFiles(false)
- .useNoJournal(false)
- .useStorageEngine(STORAGE_ENGINE)
- .verbose(false)
- .build();
-
- Storage replication = new Storage(null, CONFIG_REPLICA_SET, 0);
-
- return new MongodConfigBuilder()
- .version(Version.Main.V3_4)
- .net(new Net(LOCALHOST, configServerPort, Network.localhostIsIPv6()))
- .replication(replication)
- .shardServer(false)
- .configServer(true).cmdOptions(cmdOptions).build();
+ public void cleanup() {
+ mongoContainers.forEach(GenericContainer::stop);
}
- private static IMongodConfig crateIMongodConfig(int mongodPort, boolean flag, String replicaName)
- throws IOException {
- IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
- .useNoPrealloc(false)
- .useSmallFiles(false)
- .useNoJournal(false)
- .useStorageEngine(STORAGE_ENGINE)
- .verbose(false)
- .build();
-
- Storage replication = new Storage(null, replicaName, 0);
-
- return new MongodConfigBuilder()
- .version(Version.Main.V3_4)
- .shardServer(true)
- .net(new Net(LOCALHOST, mongodPort, Network.localhostIsIPv6()))
- .configServer(flag).replication(replication).cmdOptions(cmdOptions)
- .build();
+ public GenericContainer<?> getMasterContainer() {
+ return mongoContainers.iterator().next();
}
+ }
- private static IMongosConfig createIMongosConfig() throws IOException {
- IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
- .useNoPrealloc(false)
- .useSmallFiles(false)
- .useNoJournal(false)
- .useStorageEngine(STORAGE_ENGINE)
- .verbose(false)
- .build();
-
- return new MongosConfigBuilder()
- .version(Version.Main.V3_4)
- .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
- .replicaSet(CONFIG_REPLICA_SET)
- .configDB(LOCALHOST + ":" + CONFIG_SERVER_1_PORT)
- .cmdOptions(cmdOptions).build();
- }
+ private static class DistributedMode extends ContainerManager {
- private static void cleanup() {
- if (mongosTestFactory != null) {
- // ignoring exception because sometimes provided time isn't enough to stop mongod processes
- try {
- mongosTestFactory.stop();
- } catch (IllegalStateException e) {
- logger.warn("Failed to close all mongod processes during provided timeout", e);
- }
- }
+ @Override
+ public String setup() throws Exception {
+ Network network = Network.newNetwork();
+
+ mongoContainers = Stream.of("m1", "m2", "m3")
+ .map(host -> new GenericContainer<>("mongo:4.4.5")
+ .withNetwork(network)
+ .withNetworkAliases(host)
+ .withExposedPorts(MONGOS_PORT)
+ .withCommand(String.format("mongod --port %d --shardsvr --replSet rs0 --bind_ip localhost,%s", MONGOS_PORT, host)))
+ .collect(Collectors.toList());
+
+ String configServerHost = "m4";
+ GenericContainer<?> configServer = new GenericContainer<>("mongo:4.4.5")
+ .withNetwork(network)
+ .withNetworkAliases(configServerHost)
+ .withExposedPorts(MONGOS_PORT)
+ .withCommand(String.format("mongod --configsvr --port %s --replSet rs0conf --bind_ip localhost,%s", MONGOS_PORT, configServerHost));
+
+ configServer.start();
+
+ Container.ExecResult execResult = configServer.execInContainer("/bin/bash", "-c",
+ String.format("echo 'rs.initiate({_id: \"rs0conf\",configsvr: true, members: [{ _id : 0, host : \"%s:%2$s\" }]})' | mongo --port %2$s", configServerHost, MONGOS_PORT));
+
+ logger.info(execResult.toString());
+
+ String mongosHost = "m5";
+ GenericContainer<?> mongos = new GenericContainer<>("mongo:4.4.5")
+ .withNetwork(network)
+ .withNetworkAliases(mongosHost)
+ .withExposedPorts(MONGOS_PORT)
+ .withCommand(String.format("mongos --configdb rs0conf/%1$s:%2$s --bind_ip localhost,%3$s --port %2$s", configServerHost, MONGOS_PORT, mongosHost));
+
+ mongos.start();
+
+ mongoContainers.forEach(GenericContainer::start);
+
+ GenericContainer<?> master = getMasterContainer();
+
+ execResult = master.execInContainer("/bin/bash", "-c",
+ String.format("mongo --port %1$s --eval 'printjson(rs.initiate({_id:\"rs0\"," +
+ "members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"},{_id:2,host:\"m3:%1$s\"}]}))' --quiet", MONGOS_PORT));
+ logger.info(execResult.toString());
+
+ execResult = master.execInContainer("/bin/bash", "-c",
+ String.format("until mongo --port %s --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done", MONGOS_PORT));
+ logger.info(execResult.toString());
+
+ execResult = mongos.execInContainer("/bin/bash", "-c", "echo 'sh.addShard(\"rs0/m1\")' | mongo --port " + MONGOS_PORT);
+ logger.info(execResult.toString());
+
+ String replicaSetUrl = String.format("mongodb://%s:%s", mongos.getContainerIpAddress(), mongos.getMappedPort(MONGOS_PORT));
+
+ mongoClient = MongoClients.create(replicaSetUrl);
+
+ logger.info("Execute list shards.");
+ execResult = master.execInContainer("/bin/bash", "-c", "mongo --eval 'db.adminCommand({ listShards: 1 })' --port " + MONGOS_PORT);
+ logger.info(execResult.toString());
+
+ // Enabled sharding at database level
+ logger.info("Enabled sharding at database level");
+ execResult = mongos.execInContainer("/bin/bash", "-c", String.format("mongo --eval 'db.adminCommand( {\n" +
+ " enableSharding: \"%s\"\n" +
+ "} )'", EMPLOYEE_DB));
+ logger.info(execResult.toString());
+
+ // Create index in sharded collection
+ logger.info("Create index in sharded collection");
+ MongoDatabase db = mongoClient.getDatabase(EMPLOYEE_DB);
+ db.getCollection(EMPINFO_COLLECTION).createIndex(Indexes.ascending("employee_id"));
+
+ // Shard the collection
+ logger.info("Shard the collection: {}.{}", EMPLOYEE_DB, EMPINFO_COLLECTION);
+ execResult = mongos.execInContainer("/bin/bash", "-c", String.format(
+ "echo 'sh.shardCollection(\"%s.%s\", {\"employee_id\" : 1})' | mongo ", EMPLOYEE_DB, EMPINFO_COLLECTION));
+ logger.info(execResult.toString());
+ createMongoUser();
+ createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
+ createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
+ createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+
+ // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+ return String.format("mongodb://%s:%s", LOCALHOST, mongos.getMappedPort(MONGOS_PORT));
}
}
- private static class SingleMode {
+ public static class SingleMode extends ContainerManager {
- private static MongodExecutable mongodExecutable;
- private static MongodProcess mongod;
+ @Override
+ public String setup() throws IOException {
+ mongoContainers = Collections.singletonList(new GenericContainer<>("mongo:4.4.5")
+ .withNetwork(Network.SHARED)
+ .withNetworkAliases("M1")
+ .withExposedPorts(MONGOS_PORT)
+ .withCommand("--replSet rs0 --bind_ip localhost,M1"));
- private static String setup() throws IOException {
- IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder().verbose(false)
- .enableAuth(authEnabled).build();
+ mongoContainers.forEach(GenericContainer::start);
+ GenericContainer<?> master = getMasterContainer();
- IMongodConfig mongodConfig = new MongodConfigBuilder()
- .version(Version.Main.V3_4)
- .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
- .cmdOptions(cmdOptions).build();
+ try {
+ master.execInContainer("/bin/bash", "-c",
+ "mongo --eval 'printjson(rs.initiate({_id:\"rs0\","
+ + "members:[{_id:0,host:\"M1:27017\"}]}))' "
+ + "--quiet");
+ master.execInContainer("/bin/bash", "-c",
+ "until mongo --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;"
+ + "do sleep 1;done");
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to initiate rs.", e);
+ }
- // Configure to write Mongo message to the log. Change this to
- // defaults() if needed for debugging; will write to the console instead.
- IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder().defaultsWithLogger(
- Command.MongoD, logger).build();
- mongodExecutable = MongodStarter.getInstance(runtimeConfig).prepare(
- mongodConfig);
- mongod = mongodExecutable.start();
- mongoClient = new MongoClient(new ServerAddress(LOCALHOST, MONGOS_PORT));
+ String connectionString = String.format("mongodb://%s:%d", master.getContainerIpAddress(), master.getFirstMappedPort());
+ mongoClient = MongoClients.create(connectionString);
createMongoUser();
createDbAndCollections(EMPLOYEE_DB, EMPINFO_COLLECTION, "employee_id");
@@ -235,16 +217,7 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
- return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
- }
-
- private static void cleanup() {
- if (mongod != null) {
- mongod.stop();
- }
- if (mongodExecutable != null) {
- mongodExecutable.stop();
- }
+ return connectionString;
}
}
@@ -254,17 +227,22 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
if (initCount.get() == 0) {
if (distMode) {
logger.info("Executing tests in distributed mode");
- connectionURL = DistributedMode.setup();
+ containerManager = new DistributedMode();
} else {
logger.info("Executing tests in single mode");
- connectionURL = SingleMode.setup();
+ containerManager = new SingleMode();
}
+ connectionURL = containerManager.setup();
// ToDo DRILL-7269: fix the way how data are imported for the sharded mongo cluster
- TestTableGenerator.importData(EMPLOYEE_DB, EMPINFO_COLLECTION, EMP_DATA);
- TestTableGenerator.importData(EMPLOYEE_DB, SCHEMA_CHANGE_COLLECTION, SCHEMA_CHANGE_DATA);
- TestTableGenerator.importData(DONUTS_DB, DONUTS_COLLECTION, DONUTS_DATA);
- TestTableGenerator.importData(DATATYPE_DB, DATATYPE_COLLECTION, DATATYPE_DATA);
- TestTableGenerator.importData(ISSUE7820_DB, ISSUE7820_COLLECTION, EMP_DATA);
+ containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(EMP_DATA).toURI()), Charsets.UTF_8).read().getBytes()), EMP_DATA);
+ containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(SCHEMA_CHANGE_DATA).toURI()), Charsets.UTF_8).read().getBytes()), SCHEMA_CHANGE_DATA);
+ containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(DONUTS_DATA).toURI()), Charsets.UTF_8).read().getBytes()), DONUTS_DATA);
+ containerManager.getMasterContainer().copyFileToContainer(Transferable.of(Files.asCharSource(new File(Resources.getResource(DATATYPE_DATA).toURI()), Charsets.UTF_8).read().getBytes()), DATATYPE_DATA);
+ TestTableGenerator.importData(containerManager.getMasterContainer(), EMPLOYEE_DB, EMPINFO_COLLECTION, EMP_DATA);
+ TestTableGenerator.importData(containerManager.getMasterContainer(), EMPLOYEE_DB, SCHEMA_CHANGE_COLLECTION, SCHEMA_CHANGE_DATA);
+ TestTableGenerator.importData(containerManager.getMasterContainer(), DONUTS_DB, DONUTS_COLLECTION, DONUTS_DATA);
+ TestTableGenerator.importData(containerManager.getMasterContainer(), DATATYPE_DB, DATATYPE_COLLECTION, DATATYPE_DATA);
+ TestTableGenerator.importData(containerManager.getMasterContainer(), ISSUE7820_DB, ISSUE7820_COLLECTION, EMP_DATA);
}
initCount.incrementAndGet();
}
@@ -316,19 +294,15 @@ public class MongoTestSuite extends BaseTest implements MongoTestConstants {
if (initCount.decrementAndGet() == 0) {
try {
if (mongoClient != null) {
- mongoClient.dropDatabase(EMPLOYEE_DB);
- mongoClient.dropDatabase(DATATYPE_DB);
- mongoClient.dropDatabase(DONUTS_DB);
+ mongoClient.getDatabase(EMPLOYEE_DB).drop();
+ mongoClient.getDatabase(DATATYPE_DB).drop();
+ mongoClient.getDatabase(DONUTS_DB).drop();
}
} finally {
if (mongoClient != null) {
mongoClient.close();
}
- if (distMode) {
- DistributedMode.cleanup();
- } else {
- SingleMode.cleanup();
- }
+ containerManager.cleanup();
}
}
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
index 673a36d..d1fd63f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.mongo;
import com.mongodb.MongoCredential;
+import com.mongodb.client.internal.MongoClientImpl;
import org.apache.drill.categories.MongoStorageTest;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -25,9 +26,9 @@ import org.apache.drill.test.BaseTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
@Category({MongoStorageTest.class})
public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
@@ -36,12 +37,12 @@ public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
MongoStoragePlugin plugin = new MongoStoragePlugin(
new MongoStoragePluginConfig(connection, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
null, name);
- List<MongoCredential> creds = plugin.getClient().getCredentialsList();
+ MongoClientImpl client = (MongoClientImpl) plugin.getClient();
+ MongoCredential cred = client.getSettings().getCredential();
if (expectedUserName == null) {
- assertEquals(0, creds.size());
+ assertNull(cred);
} else {
- assertEquals(1, creds.size());
- MongoCredential cred = creds.get(0);
+ assertNotNull(cred);
assertEquals(expectedUserName, cred.getUserName());
assertEquals(expectedPassword, new String(cred.getPassword()));
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
index 6051fcd..c846343 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
@@ -17,64 +17,25 @@
*/
package org.apache.drill.exec.store.mongo;
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
-import de.flapdoodle.embed.mongo.Command;
-import de.flapdoodle.embed.mongo.MongoImportExecutable;
-import de.flapdoodle.embed.mongo.MongoImportProcess;
-import de.flapdoodle.embed.mongo.MongoImportStarter;
-import de.flapdoodle.embed.mongo.config.IMongoImportConfig;
-import de.flapdoodle.embed.mongo.config.MongoImportConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.config.RuntimeConfigBuilder;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.config.IRuntimeConfig;
-import de.flapdoodle.embed.process.runtime.Network;
+import java.io.IOException;
public class TestTableGenerator implements MongoTestConstants {
private static final Logger logger = LoggerFactory
.getLogger(TestTableGenerator.class);
- public static void importData(String dbName, String collectionName,
- String fileName) throws InterruptedException, IOException, URISyntaxException {
- File jsonFile = new File(Resources.getResource(fileName).toURI());
- generateTable(dbName, collectionName, jsonFile.getAbsolutePath(), true, true, false);
- }
-
- public static void generateTable(String dbName, String collection,
- String jsonFile, Boolean jsonArray, Boolean upsert, Boolean drop)
- throws InterruptedException, IOException {
- logger.info("Started importing file {} into collection {} ", jsonFile,
- collection);
- IMongoImportConfig mongoImportConfig = new MongoImportConfigBuilder()
- .version(Version.Main.V3_4)
- .net(new Net(MONGOS_PORT, Network.localhostIsIPv6())).db(dbName)
- .collection(collection).upsert(upsert).dropCollection(drop)
- .jsonArray(jsonArray).importFile(jsonFile).build();
- // Configure to write Mongo message to the log. Change this to
- // .getDefaultInstance() if needed for debugging; will write to
- // the console instead.
- IRuntimeConfig rtConfig = new RuntimeConfigBuilder()
- .defaultsWithLogger(Command.MongoImport, logger)
- .daemonProcess(false)
- .build();
- MongoImportExecutable importExecutable = MongoImportStarter
- .getInstance(rtConfig).prepare(mongoImportConfig);
- MongoImportProcess importProcess = importExecutable.start();
-
- // import is in a separate process, we should wait until the process exit
- while (importProcess.isProcessRunning()) {
- Thread.sleep(1000);
- }
+ public static void importData(GenericContainer<?> mongo, String dbName, String collectionName,
+ String fileName) throws InterruptedException, IOException {
+ Container.ExecResult execResult = mongo.execInContainer("/bin/bash", "-c",
+ "mongoimport --db " + dbName + " --collection " + collectionName + " --jsonArray --upsert --file " + fileName);
+ logger.info(execResult.toString());
- logger.info("Imported file {} into collection {} ", jsonFile, collection);
+ logger.info("Imported file {} into collection {} ", fileName, collectionName);
}
}
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index a08e172..65d7db4 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -198,11 +198,6 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.12.7</version>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
<version>${jackson.version}</version>