You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/04/27 20:31:36 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2201: DRILL-7903: Update mongo driver from 3.12 to 4.2

vvysotskyi commented on a change in pull request #2201:
URL: https://github.com/apache/drill/pull/2201#discussion_r615376855



##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -530,8 +503,7 @@ public ScanStats getScanStats() {
       if (recordCount != 0) {
         //toJson should use client's codec, otherwise toJson could fail on
         // some types not known to DocumentCodec, e.g. DBRef.
-        final DocumentCodec codec =
-            new DocumentCodec(client.getMongoClientOptions().getCodecRegistry(), new BsonTypeClassMap());
+        DocumentCodec codec = new DocumentCodec(db.getCodecRegistry(), new BsonTypeClassMap());

Review comment:
       Yes, API was changed, there is no method for `MongoClientOptions` anymore.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -297,30 +279,28 @@ private void handleUnshardedCollection(List<String> hosts) {
     String host = hosts.get(0);
     ServerAddress address = new ServerAddress(host);
     ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
-    chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
-    chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+    chunkInfo.setMinFilters(Collections.emptyMap());
+    chunkInfo.setMaxFilters(Collections.emptyMap());
     List<ChunkInfo> chunksList = Lists.newArrayList();
     chunksList.add(chunkInfo);
     chunksInverseMapping.put(address.getHost(), chunksList);
   }
 
-  private List<String> getPrimaryShardInfo(MongoClient client) {
+  private List<String> getPrimaryShardInfo() {
     MongoDatabase database = storagePlugin.getClient().getDatabase(CONFIG);
     //Identify the primary shard of the queried database.
     MongoCollection<Document> collection = database.getCollection(DATABASES);
     Bson filter = new Document(ID, this.scanSpec.getDbName());
     Bson projection = new Document(PRIMARY, select);
-    Document document = collection.find(filter).projection(projection).first();
-    Preconditions.checkNotNull(document);
+    Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());

Review comment:
       I have reproduced it locally and fixed it.

##########
File path: contrib/storage-mongo/pom.xml
##########
@@ -64,9 +64,9 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>de.flapdoodle.embed</groupId>

Review comment:
       `de.flapdoodle.embed` uses `mongo-java-driver` and is incompatible with `mongodb-driver-sync`, so we have to move out to another solution for testing.

##########
File path: contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
##########
@@ -82,169 +71,133 @@
   private static final Logger logger = LoggerFactory.getLogger(MongoTestSuite.class);
   protected static MongoClient mongoClient;
 
-  private static boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
-  private static boolean authEnabled = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.authEnabled", "false"));
+  private static final boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
   private static volatile String connectionURL = null;
-  private static volatile AtomicInteger initCount = new AtomicInteger(0);
+  private static final AtomicInteger initCount = new AtomicInteger(0);
+
+  private static ContainerManager containerManager;
 
   public static String getConnectionURL() {
     return connectionURL;
   }
 
-  private static class DistributedMode {
-    private static MongosSystemForTestFactory mongosTestFactory;
-
-    private static String setup() throws Exception {
-      // creating configServers
-      List<IMongodConfig> configServers = new ArrayList<>(1);
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_1_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_2_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_3_PORT));
-
-      // creating replicaSets
-      // A LinkedHashMap ensures that the config servers are started first.
-      Map<String, List<IMongodConfig>> replicaSets = new LinkedHashMap<>();
-
-      List<IMongodConfig> replicaSet1 = new ArrayList<>();
-      replicaSet1.add(crateIMongodConfig(MONGOD_1_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_2_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_3_PORT, false, REPLICA_SET_1_NAME));
-
-      List<IMongodConfig> replicaSet2 = new ArrayList<>();
-      replicaSet2.add(crateIMongodConfig(MONGOD_4_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_5_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_6_PORT, false, REPLICA_SET_2_NAME));
-
-      replicaSets.put(CONFIG_REPLICA_SET, configServers);
-      replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
-      replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
-
-      // create mongo shards
-      IMongosConfig mongosConfig = createIMongosConfig();
-      mongosTestFactory = new MongosSystemForTestFactory(mongosConfig, replicaSets, Lists.newArrayList(),
-          EMPLOYEE_DB, EMPINFO_COLLECTION,"employee_id");
-      try {
-        mongosTestFactory.start();
-        mongoClient = (MongoClient) mongosTestFactory.getMongo();
-      } catch (Throwable e) {
-        logger.error(" Error while starting sharded cluster. ", e);
-        throw new Exception(" Error while starting sharded cluster. ", e);
-      }
-      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
-      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
-      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+  private abstract static class ContainerManager {
+    protected static List<GenericContainer<?>> mongoContainers;
 
-      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
-      return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
-    }
+    public abstract String setup() throws Exception;
 
-    private static IMongodConfig crateConfigServerConfig(int configServerPort) throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, CONFIG_REPLICA_SET, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, configServerPort, Network.localhostIsIPv6()))
-          .replication(replication)
-          .shardServer(false)
-          .configServer(true).cmdOptions(cmdOptions).build();
+    public void cleanup() {
+      mongoContainers.forEach(GenericContainer::stop);
     }
 
-    private static IMongodConfig crateIMongodConfig(int mongodPort, boolean flag, String replicaName)
-        throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, replicaName, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .shardServer(true)
-          .net(new Net(LOCALHOST, mongodPort, Network.localhostIsIPv6()))
-          .configServer(flag).replication(replication).cmdOptions(cmdOptions)
-          .build();
+    public GenericContainer<?> getMasterContainer() {
+      return mongoContainers.iterator().next();
     }
+  }
 
-    private static IMongosConfig createIMongosConfig() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      return new MongosConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .replicaSet(CONFIG_REPLICA_SET)
-          .configDB(LOCALHOST + ":" + CONFIG_SERVER_1_PORT)
-          .cmdOptions(cmdOptions).build();
-    }
+  private static class DistributedMode extends ContainerManager {
+
+    @Override
+    public String setup() throws Exception {
+      Network network = Network.newNetwork();
+
+      mongoContainers = Stream.of("m1", "m2", "m3")
+          .map(networkAlias -> new GenericContainer<>("mongo:4.4.5")
+              .withNetwork(network)
+              .withNetworkAliases(networkAlias)
+              .withExposedPorts(MONGOS_PORT)
+              .withCommand("--replSet rs0 --bind_ip localhost," + networkAlias))
+          .collect(Collectors.toList());
+
+      mongoContainers.forEach(GenericContainer::start);
+
+      GenericContainer<?> master = getMasterContainer();
+
+      Container.ExecResult execResult = master.execInContainer("/bin/bash", "-c",
+          String.format("mongo --eval 'printjson(rs.initiate({_id:\"rs0\"," +
+          "members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"},{_id:2,host:\"m3:%1$s\"}]}))' --quiet", MONGOS_PORT));
+      logger.info(execResult.toString());
+      execResult = master.execInContainer("/bin/bash", "-c",
+          "until mongo --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done");
+      logger.info(execResult.toString());
+
+      String hosts = Stream.of(master)
+          .map(c -> c.getContainerIpAddress() + ":" + c.getMappedPort(MONGOS_PORT))
+          .collect(Collectors.joining(","));
+
+      String replicaSetUrl = String.format("mongodb://%s", hosts);
+
+      mongoClient = MongoClients.create(replicaSetUrl);
+
+      logger.info("Execute list shards.");
+      execResult = master.execInContainer("/bin/bash", "-c", "mongo --eval 'db.adminCommand({ listShards: 1 })'");
+      logger.info(execResult.toString());
+
+      // Enabled sharding at database level
+      logger.info("Enabled sharding at database level");
+      execResult = master.execInContainer("/bin/bash", "-c", String.format("mongo --eval 'db.adminCommand( {\n" +
+          "   enableSharding: \"%s\"\n" +
+          "} )'", EMPLOYEE_DB));
+      logger.info(execResult.toString());
+
+      // Create index in sharded collection
+      logger.info("Create index in sharded collection");
+      MongoDatabase db = mongoClient.getDatabase(EMPLOYEE_DB);
+      db.getCollection(EMPINFO_COLLECTION).createIndex(Indexes.ascending("employee_id"));
+
+      // Shard the collection
+      logger.info("Shard the collection: {}.{}", EMPLOYEE_DB, EMPINFO_COLLECTION);
+      execResult = master.execInContainer("/bin/bash", "-c", String.format(
+          "mongo --eval '{\n" +
+              "   shardCollection: \"%s.%s\",\n" +
+              "   key: { employee_id: 1 },\n" +
+              "}'", EMPLOYEE_DB, EMPINFO_COLLECTION));
+      logger.info(execResult.toString());
+      createMongoUser();
+      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
+      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
+      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
 
-    private static void cleanup() {
-      if (mongosTestFactory != null) {
-        // ignoring exception because sometimes provided time isn't enough to stop mongod processes
-        try {
-            mongosTestFactory.stop();
-          } catch (IllegalStateException e) {
-            logger.warn("Failed to close all mongod processes during provided timeout", e);
-          }
-      }
+      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+      return String.format("mongodb://%s:%s", LOCALHOST, master.getMappedPort(MONGOS_PORT));
     }
   }
 
-  private static class SingleMode {
+  public static class SingleMode extends ContainerManager {
 
-    private static MongodExecutable mongodExecutable;
-    private static MongodProcess mongod;
+    @Override
+    public String setup() throws IOException {
+      mongoContainers = Collections.singletonList(new GenericContainer<>("mongo:4.4.5")
+          .withNetwork(Network.SHARED)
+          .withNetworkAliases("M1")
+          .withExposedPorts(MONGOS_PORT)
+          .withCommand("--replSet rs0 --bind_ip localhost,M1"));
 
-    private static String setup() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder().verbose(false)
-          .enableAuth(authEnabled).build();
+      mongoContainers.forEach(GenericContainer::start);
+      GenericContainer<?> master = getMasterContainer();
 
-      IMongodConfig mongodConfig = new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .cmdOptions(cmdOptions).build();
+      try {

Review comment:
       Sorry, I didn't understand what you proposed and how to use it...
   This code initializes replica set as it is recommended in official mongo instructions: https://docs.mongodb.com/manual/tutorial/deploy-replica-set/
   And after that, waits until it is initialized.
   
   Code part related to creating users defined in the `createMongoUser()` method and it is quite flexible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org