You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by js...@apache.org on 2015/04/29 19:49:07 UTC

drill git commit: DRILL-1502: Can't connect to mongo when requiring auth

Repository: drill
Updated Branches:
  refs/heads/master c7cb36c8b -> f5b0f4928


DRILL-1502: Can't connect to mongo when requiring auth


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f5b0f492
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f5b0f492
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f5b0f492

Branch: refs/heads/master
Commit: f5b0f4928d9c8c47c145a179c52ba3933d85c0b4
Parents: c7cb36c
Author: Kamesh <ka...@gmail.com>
Authored: Wed Oct 29 21:22:25 2014 +0530
Committer: Jason Altekruse <al...@gmail.com>
Committed: Wed Apr 29 08:22:41 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/mongo/MongoCnxnKey.java    | 75 ++++++++++++++++++++
 .../exec/store/mongo/MongoCnxnManager.java      | 31 ++++----
 .../drill/exec/store/mongo/MongoGroupScan.java  | 27 ++++---
 .../exec/store/mongo/MongoRecordReader.java     |  8 ++-
 .../exec/store/mongo/MongoScanBatchCreator.java |  5 +-
 .../store/mongo/MongoStoragePluginConfig.java   | 15 +++-
 .../store/mongo/schema/MongoSchemaFactory.java  | 38 +++++++---
 7 files changed, 159 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
new file mode 100644
index 0000000..5323d1b
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.ServerAddress;
+
+public class MongoCnxnKey {
+
+  private ServerAddress address;
+  private String user;
+
+  public MongoCnxnKey(ServerAddress address, String user) {
+    this.address = address;
+    this.user = user;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((address == null) ? 0 : address.hashCode());
+    result = prime * result + ((user == null) ? 0 : user.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    MongoCnxnKey other = (MongoCnxnKey) obj;
+    if (address == null) {
+      if (other.address != null) {
+        return false;
+      }
+    } else if (!address.equals(other.address)) {
+      return false;
+    }
+    if (user == null) {
+      if (other.user != null) {
+        return false;
+      }
+    } else if (!user.equals(other.user)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "[address=" + address.toString() + ", user=" + user + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
index a4482dd..35cc265 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -30,13 +31,14 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 
 public class MongoCnxnManager {
 
   private static final Logger logger = LoggerFactory
       .getLogger(MongoCnxnManager.class);
-  private static Cache<ServerAddress, MongoClient> addressClientMap;
+  private static Cache<MongoCnxnKey, MongoClient> addressClientMap;
 
   static {
     addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
@@ -45,28 +47,33 @@ public class MongoCnxnManager {
   }
 
   private static class AddressCloser implements
-      RemovalListener<ServerAddress, MongoClient> {
+      RemovalListener<MongoCnxnKey, MongoClient> {
     @Override
     public synchronized void onRemoval(
-        RemovalNotification<ServerAddress, MongoClient> removal) {
+        RemovalNotification<MongoCnxnKey, MongoClient> removal) {
       removal.getValue().close();
-      ;
       logger.debug("Closed connection to {}.", removal.getKey().toString());
     }
   }
 
   public synchronized static MongoClient getClient(
-      List<ServerAddress> addresses, MongoClientOptions clientOptions)
-      throws UnknownHostException {
+      List<ServerAddress> addresses, MongoClientOptions clientOptions,
+      MongoCredential credential) throws UnknownHostException {
     // Take the first replica from the replicated servers
     ServerAddress serverAddress = addresses.get(0);
-    MongoClient client = addressClientMap.getIfPresent(serverAddress);
+    String userName = credential == null ? null : credential.getUserName();
+    MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
+    MongoClient client = addressClientMap.getIfPresent(key);
     if (client == null) {
-      client = new MongoClient(addresses, clientOptions);
-      addressClientMap.put(serverAddress, client);
-      logger.debug("Created connection to {}.", serverAddress.toString());
-      logger.debug("Number of connections opened are {}.",
-          addressClientMap.size());
+      if (credential != null) {
+        List<MongoCredential> credentialList = Arrays.asList(credential);
+        client = new MongoClient(addresses, credentialList, clientOptions);
+      } else {
+        client = new MongoClient(addresses, clientOptions);
+      }
+      addressClientMap.put(key, client);
+      logger.debug("Created connection to {}.", key.toString());
+      logger.debug("Number of open connections {}.", addressClientMap.size());
     }
     return client;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 54d34f9..e33d2ae 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -165,19 +165,22 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   private boolean isShardedCluster(MongoClient client) {
-    // need to check better way of identifying
-    List<String> databaseNames = client.getDatabaseNames();
-    return databaseNames.contains(CONFIG);
+    DB db = client.getDB(scanSpec.getDbName());
+    String msg = db.command("isMaster").getString("msg");
+    return msg == null ? false : msg.equals("isdbgrid");
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void init() throws IOException {
-    MongoClient client = null;
     try {
-      MongoClientURI clientURI = new MongoClientURI(
-          this.storagePluginConfig.getConnection());
-      client = new MongoClient(clientURI);
-
+      List<String> h = storagePluginConfig.getHosts();
+      List<ServerAddress> addresses = Lists.newArrayList();
+      for (String host : h) {
+        addresses.add(new ServerAddress(host));
+      }
+      MongoClient client = MongoCnxnManager.getClient(addresses,
+          storagePluginConfig.getMongoOptions(),
+          storagePluginConfig.getMongoCrendials());
       chunksMapping = Maps.newHashMap();
       chunksInverseMapping = Maps.newLinkedHashMap();
       if (isShardedCluster(client)) {
@@ -268,7 +271,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       } else {
         String chunkName = scanSpec.getDbName() + "."
             + scanSpec.getCollectionName();
-        List<String> hosts = clientURI.getHosts();
+        List<String> hosts = storagePluginConfig.getHosts();
         Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
         if (addressList == null) {
           addressList = Sets.newHashSet();
@@ -289,10 +292,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
     } catch (UnknownHostException e) {
       throw new DrillRuntimeException(e.getMessage(), e);
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
 
   }
@@ -475,7 +474,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
         addresses.add(new ServerAddress(host));
       }
       MongoClient client = MongoCnxnManager.getClient(addresses,
-          clientURI.getOptions());
+          clientURI.getOptions(), clientURI.getCredentials());
       DB db = client.getDB(scanSpec.getDbName());
       DBCollection collection = db.getCollectionFromString(scanSpec
           .getCollectionName());

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 0b263df..3c4472c 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -51,6 +51,7 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
 
@@ -67,6 +68,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   private DBObject fields;
 
   private MongoClientOptions clientOptions;
+  private MongoCredential credential;
   private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
 
@@ -74,8 +76,9 @@ public class MongoRecordReader extends AbstractRecordReader {
 
   public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context,
-      MongoClientOptions clientOptions) {
+      MongoClientOptions clientOptions, MongoCredential credential) {
     this.clientOptions = clientOptions;
+    this.credential = credential;
     this.fields = new BasicDBObject();
     // exclude _id field, if not mentioned by user.
     this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
@@ -134,7 +137,8 @@ public class MongoRecordReader extends AbstractRecordReader {
       for (String host : hosts) {
         addresses.add(new ServerAddress(host));
       }
-      MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
+      MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
+          credential);
       DB db = client.getDB(subScanSpec.getDbName());
       collection = db.getCollection(subScanSpec.getCollectionName());
     } catch (UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 8e5fd7d..c4597b5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
 
 public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
   static final Logger logger = LoggerFactory
@@ -52,8 +53,10 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
         }
         MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
             .getMongoOptions();
+        MongoCredential mongoCrendials = subScan.getMongoPluginConfig()
+            .getMongoCrendials();
         readers.add(new MongoRecordReader(scanSpec, columns, context,
-            clientOptions));
+            clientOptions, mongoCrendials));
       } catch (Exception e) {
         logger.error("MongoRecordReader creation failed for subScan:  "
             + subScan + ".");

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index b7cbf24..b39e7b9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.mongo;
 
+import java.util.List;
+
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -25,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
 public class MongoStoragePluginConfig extends StoragePluginConfig {
@@ -41,6 +44,7 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
   @JsonCreator
   public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
     this.connection = connection;
+    this.clientURI = new MongoClientURI(connection);
   }
 
   @Override
@@ -61,11 +65,20 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
   }
 
   @JsonIgnore
+  public MongoCredential getMongoCrendials() {
+    return clientURI.getCredentials();
+  }
+
+  @JsonIgnore
   public MongoClientOptions getMongoOptions() {
-    MongoClientURI clientURI = new MongoClientURI(connection);
     return clientURI.getOptions();
   }
 
+  @JsonIgnore
+  public List<String> getHosts() {
+    return clientURI.getHosts();
+  }
+
   public String getConnection() {
     return connection;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index c92a35d..fccffb5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -52,7 +53,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.mongodb.DB;
 import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
+import com.mongodb.MongoException;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
 
@@ -70,21 +72,20 @@ public class MongoSchemaFactory implements SchemaFactory {
 
   private final List<ServerAddress> addresses;
   private final MongoClientOptions options;
+  private final MongoCredential credential;
 
   public MongoSchemaFactory(MongoStoragePlugin schema, String schemaName)
       throws ExecutionSetupException, UnknownHostException {
-    String connection = schema.getConfig().getConnection();
-
     this.plugin = schema;
     this.schemaName = schemaName;
 
-    MongoClientURI clientURI = new MongoClientURI(connection);
-    List<String> hosts = clientURI.getHosts();
+    List<String> hosts = plugin.getConfig().getHosts();
     addresses = Lists.newArrayList();
     for (String host : hosts) {
       addresses.add(new ServerAddress(host));
     }
-    options = clientURI.getOptions();
+    options = plugin.getConfig().getMongoOptions();
+    credential = plugin.getConfig().getMongoCrendials();
 
     databases = CacheBuilder //
         .newBuilder() //
@@ -104,7 +105,16 @@ public class MongoSchemaFactory implements SchemaFactory {
       if (!DATABASES.equals(key)) {
         throw new UnsupportedOperationException();
       }
-      return MongoCnxnManager.getClient(addresses, options).getDatabaseNames();
+      try {
+        return MongoCnxnManager.getClient(addresses, options, credential)
+            .getDatabaseNames();
+      } catch (MongoException me) {
+        logger.warn("Failure while loading databases in Mongo. {}",
+            me.getMessage());
+        return Collections.emptyList();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e.getMessage(), e);
+      }
     }
 
   }
@@ -113,9 +123,17 @@ public class MongoSchemaFactory implements SchemaFactory {
 
     @Override
     public List<String> load(String dbName) throws Exception {
-      DB db = MongoCnxnManager.getClient(addresses, options).getDB(dbName);
-      Set<String> collectionNames = db.getCollectionNames();
-      return new ArrayList<>(collectionNames);
+      try {
+        DB db = MongoCnxnManager.getClient(addresses, options, credential)
+            .getDB(dbName);
+        return new ArrayList<>(db.getCollectionNames());
+      } catch (MongoException me) {
+        logger.warn("Failure while getting collection names from '{}'. {}",
+            dbName, me.getMessage());
+        return Collections.emptyList();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e.getMessage(), e);
+      }
     }
   }