You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/04/18 03:47:49 UTC

[GitHub] [drill] luocooong commented on a change in pull request #2201: DRILL-7903: Update mongo driver from 3.12 to 4.2

luocooong commented on a change in pull request #2201:
URL: https://github.com/apache/drill/pull/2201#discussion_r615330818



##########
File path: contrib/storage-mongo/pom.xml
##########
@@ -64,9 +64,9 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>de.flapdoodle.embed</groupId>

Review comment:
       The `de.flapdoodle.embed` is a good embedded mongo framework. It is a pity that it cannot support our test now. Can you explain that why to using the based on docker framework to instead this?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -84,16 +83,9 @@
 
   private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
-    @Override
-    public int compare(List<MongoSubScanSpec> list1,
-        List<MongoSubScanSpec> list2) {
-      return list1.size() - list2.size();
-    }
-  };
+  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);

Review comment:
       More simple (with Java Lambda), but not easy to read and debug. (Not a problem)

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -297,30 +279,28 @@ private void handleUnshardedCollection(List<String> hosts) {
     String host = hosts.get(0);
     ServerAddress address = new ServerAddress(host);
     ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
-    chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
-    chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+    chunkInfo.setMinFilters(Collections.emptyMap());
+    chunkInfo.setMaxFilters(Collections.emptyMap());
     List<ChunkInfo> chunksList = Lists.newArrayList();
     chunksList.add(chunkInfo);
     chunksInverseMapping.put(address.getHost(), chunksList);
   }
 
-  private List<String> getPrimaryShardInfo(MongoClient client) {
+  private List<String> getPrimaryShardInfo() {
     MongoDatabase database = storagePlugin.getClient().getDatabase(CONFIG);
     //Identify the primary shard of the queried database.
     MongoCollection<Document> collection = database.getCollection(DATABASES);
     Bson filter = new Document(ID, this.scanSpec.getDbName());
     Bson projection = new Document(PRIMARY, select);
-    Document document = collection.find(filter).projection(projection).first();
-    Preconditions.checkNotNull(document);
+    Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());

Review comment:
       An user had reported a problem on our Slack channel (some days ago):
   > I am trying to query a Sharded cluster of MongoDB using embedded mode and distributed mode. But both is giving an error of:
   query: select * from mongo.db.collection limit 1;
   warning   UserRemoteException : 	SYSTEM ERROR: ClassCastException: org.bson.types.ObjectId cannot be cast to java.lang.String
   org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR: ClassCastException: org.bson.types.ObjectId cannot be cast to java.lang.String
   Please, refer to logs for more information.
   [Error Id: 43733103-4cac-4aef-a09f-275df4b4ee4b on ubuntu:31010]
   store.mongo.all_text_mode : true
   store.mongo.bson.record.reader: false
   store.mongo.read_numbers_as_double: true
   It throws an error only when collection is sharded, on non-sharded collection, it is working fine.
   
   Could you please check that the failed not an issues? thanks
   
   > Drill 1.18
   MongoDB 4.4 Sharded (Not a Replica Sets)

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -530,8 +503,7 @@ public ScanStats getScanStats() {
       if (recordCount != 0) {
         //toJson should use client's codec, otherwise toJson could fail on
         // some types not known to DocumentCodec, e.g. DBRef.
-        final DocumentCodec codec =
-            new DocumentCodec(client.getMongoClientOptions().getCodecRegistry(), new BsonTypeClassMap());
+        DocumentCodec codec = new DocumentCodec(db.getCodecRegistry(), new BsonTypeClassMap());

Review comment:
       Is this change due to a version requirement (driver 4.2) or exist an issue before?

##########
File path: contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
##########
@@ -82,169 +71,133 @@
   private static final Logger logger = LoggerFactory.getLogger(MongoTestSuite.class);
   protected static MongoClient mongoClient;
 
-  private static boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
-  private static boolean authEnabled = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.authEnabled", "false"));
+  private static final boolean distMode = Boolean.parseBoolean(System.getProperty("drill.mongo.tests.shardMode", "false"));
   private static volatile String connectionURL = null;
-  private static volatile AtomicInteger initCount = new AtomicInteger(0);
+  private static final AtomicInteger initCount = new AtomicInteger(0);
+
+  private static ContainerManager containerManager;
 
   public static String getConnectionURL() {
     return connectionURL;
   }
 
-  private static class DistributedMode {
-    private static MongosSystemForTestFactory mongosTestFactory;
-
-    private static String setup() throws Exception {
-      // creating configServers
-      List<IMongodConfig> configServers = new ArrayList<>(1);
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_1_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_2_PORT));
-      configServers.add(crateConfigServerConfig(CONFIG_SERVER_3_PORT));
-
-      // creating replicaSets
-      // A LinkedHashMap ensures that the config servers are started first.
-      Map<String, List<IMongodConfig>> replicaSets = new LinkedHashMap<>();
-
-      List<IMongodConfig> replicaSet1 = new ArrayList<>();
-      replicaSet1.add(crateIMongodConfig(MONGOD_1_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_2_PORT, false, REPLICA_SET_1_NAME));
-      replicaSet1.add(crateIMongodConfig(MONGOD_3_PORT, false, REPLICA_SET_1_NAME));
-
-      List<IMongodConfig> replicaSet2 = new ArrayList<>();
-      replicaSet2.add(crateIMongodConfig(MONGOD_4_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_5_PORT, false, REPLICA_SET_2_NAME));
-      replicaSet2.add(crateIMongodConfig(MONGOD_6_PORT, false, REPLICA_SET_2_NAME));
-
-      replicaSets.put(CONFIG_REPLICA_SET, configServers);
-      replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
-      replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
-
-      // create mongo shards
-      IMongosConfig mongosConfig = createIMongosConfig();
-      mongosTestFactory = new MongosSystemForTestFactory(mongosConfig, replicaSets, Lists.newArrayList(),
-          EMPLOYEE_DB, EMPINFO_COLLECTION,"employee_id");
-      try {
-        mongosTestFactory.start();
-        mongoClient = (MongoClient) mongosTestFactory.getMongo();
-      } catch (Throwable e) {
-        logger.error(" Error while starting sharded cluster. ", e);
-        throw new Exception(" Error while starting sharded cluster. ", e);
-      }
-      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
-      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
-      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
+  private abstract static class ContainerManager {
+    protected static List<GenericContainer<?>> mongoContainers;
 
-      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
-      return String.format("mongodb://%s:%s", LOCALHOST, MONGOS_PORT);
-    }
+    public abstract String setup() throws Exception;
 
-    private static IMongodConfig crateConfigServerConfig(int configServerPort) throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, CONFIG_REPLICA_SET, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, configServerPort, Network.localhostIsIPv6()))
-          .replication(replication)
-          .shardServer(false)
-          .configServer(true).cmdOptions(cmdOptions).build();
+    public void cleanup() {
+      mongoContainers.forEach(GenericContainer::stop);
     }
 
-    private static IMongodConfig crateIMongodConfig(int mongodPort, boolean flag, String replicaName)
-        throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      Storage replication = new Storage(null, replicaName, 0);
-
-      return new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .shardServer(true)
-          .net(new Net(LOCALHOST, mongodPort, Network.localhostIsIPv6()))
-          .configServer(flag).replication(replication).cmdOptions(cmdOptions)
-          .build();
+    public GenericContainer<?> getMasterContainer() {
+      return mongoContainers.iterator().next();
     }
+  }
 
-    private static IMongosConfig createIMongosConfig() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder()
-        .useNoPrealloc(false)
-        .useSmallFiles(false)
-        .useNoJournal(false)
-        .useStorageEngine(STORAGE_ENGINE)
-        .verbose(false)
-        .build();
-
-      return new MongosConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .replicaSet(CONFIG_REPLICA_SET)
-          .configDB(LOCALHOST + ":" + CONFIG_SERVER_1_PORT)
-          .cmdOptions(cmdOptions).build();
-    }
+  private static class DistributedMode extends ContainerManager {
+
+    @Override
+    public String setup() throws Exception {
+      Network network = Network.newNetwork();
+
+      mongoContainers = Stream.of("m1", "m2", "m3")
+          .map(networkAlias -> new GenericContainer<>("mongo:4.4.5")
+              .withNetwork(network)
+              .withNetworkAliases(networkAlias)
+              .withExposedPorts(MONGOS_PORT)
+              .withCommand("--replSet rs0 --bind_ip localhost," + networkAlias))
+          .collect(Collectors.toList());
+
+      mongoContainers.forEach(GenericContainer::start);
+
+      GenericContainer<?> master = getMasterContainer();
+
+      Container.ExecResult execResult = master.execInContainer("/bin/bash", "-c",
+          String.format("mongo --eval 'printjson(rs.initiate({_id:\"rs0\"," +
+          "members:[{_id:0,host:\"m1:%1$s\"},{_id:1,host:\"m2:%1$s\"},{_id:2,host:\"m3:%1$s\"}]}))' --quiet", MONGOS_PORT));
+      logger.info(execResult.toString());
+      execResult = master.execInContainer("/bin/bash", "-c",
+          "until mongo --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;do sleep 1;done");
+      logger.info(execResult.toString());
+
+      String hosts = Stream.of(master)
+          .map(c -> c.getContainerIpAddress() + ":" + c.getMappedPort(MONGOS_PORT))
+          .collect(Collectors.joining(","));
+
+      String replicaSetUrl = String.format("mongodb://%s", hosts);
+
+      mongoClient = MongoClients.create(replicaSetUrl);
+
+      logger.info("Execute list shards.");
+      execResult = master.execInContainer("/bin/bash", "-c", "mongo --eval 'db.adminCommand({ listShards: 1 })'");
+      logger.info(execResult.toString());
+
+      // Enabled sharding at database level
+      logger.info("Enabled sharding at database level");
+      execResult = master.execInContainer("/bin/bash", "-c", String.format("mongo --eval 'db.adminCommand( {\n" +
+          "   enableSharding: \"%s\"\n" +
+          "} )'", EMPLOYEE_DB));
+      logger.info(execResult.toString());
+
+      // Create index in sharded collection
+      logger.info("Create index in sharded collection");
+      MongoDatabase db = mongoClient.getDatabase(EMPLOYEE_DB);
+      db.getCollection(EMPINFO_COLLECTION).createIndex(Indexes.ascending("employee_id"));
+
+      // Shard the collection
+      logger.info("Shard the collection: {}.{}", EMPLOYEE_DB, EMPINFO_COLLECTION);
+      execResult = master.execInContainer("/bin/bash", "-c", String.format(
+          "mongo --eval '{\n" +
+              "   shardCollection: \"%s.%s\",\n" +
+              "   key: { employee_id: 1 },\n" +
+              "}'", EMPLOYEE_DB, EMPINFO_COLLECTION));
+      logger.info(execResult.toString());
+      createMongoUser();
+      createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
+      createDbAndCollections(EMPLOYEE_DB, EMPTY_COLLECTION, "field_2");
+      createDbAndCollections(DATATYPE_DB, DATATYPE_COLLECTION, "_id");
 
-    private static void cleanup() {
-      if (mongosTestFactory != null) {
-        // ignoring exception because sometimes provided time isn't enough to stop mongod processes
-        try {
-            mongosTestFactory.stop();
-          } catch (IllegalStateException e) {
-            logger.warn("Failed to close all mongod processes during provided timeout", e);
-          }
-      }
+      // the way how it work: client -> router(mongos) -> Shard1 ... ShardN
+      return String.format("mongodb://%s:%s", LOCALHOST, master.getMappedPort(MONGOS_PORT));
     }
   }
 
-  private static class SingleMode {
+  public static class SingleMode extends ContainerManager {
 
-    private static MongodExecutable mongodExecutable;
-    private static MongodProcess mongod;
+    @Override
+    public String setup() throws IOException {
+      mongoContainers = Collections.singletonList(new GenericContainer<>("mongo:4.4.5")
+          .withNetwork(Network.SHARED)
+          .withNetworkAliases("M1")
+          .withExposedPorts(MONGOS_PORT)
+          .withCommand("--replSet rs0 --bind_ip localhost,M1"));
 
-    private static String setup() throws IOException {
-      IMongoCmdOptions cmdOptions = new MongoCmdOptionsBuilder().verbose(false)
-          .enableAuth(authEnabled).build();
+      mongoContainers.forEach(GenericContainer::start);
+      GenericContainer<?> master = getMasterContainer();
 
-      IMongodConfig mongodConfig = new MongodConfigBuilder()
-          .version(Version.Main.V3_4)
-          .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
-          .cmdOptions(cmdOptions).build();
+      try {

Review comment:
       There is a new practice to apply the same opeation (In my production environment):
   ```
   .....
   rs.initiate(cfg);
   while(db.isMaster().ismaster == false) { }
   db = db.getSiblingDB('admin');
   db.createUser({user:"admin", pwd:"0000", roles:["root"]});
   db.createUser({user:"users", pwd:"0000", roles:["userAdminAnyDatabase"]});
   .....
   ```
   mongo --quiet init.js




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org