You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by js...@apache.org on 2015/04/29 19:49:07 UTC
drill git commit: DRILL-1502: Can't connect to mongo when requiring
auth
Repository: drill
Updated Branches:
refs/heads/master c7cb36c8b -> f5b0f4928
DRILL-1502: Can't connect to mongo when requiring auth
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f5b0f492
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f5b0f492
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f5b0f492
Branch: refs/heads/master
Commit: f5b0f4928d9c8c47c145a179c52ba3933d85c0b4
Parents: c7cb36c
Author: Kamesh <ka...@gmail.com>
Authored: Wed Oct 29 21:22:25 2014 +0530
Committer: Jason Altekruse <al...@gmail.com>
Committed: Wed Apr 29 08:22:41 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/mongo/MongoCnxnKey.java | 75 ++++++++++++++++++++
.../exec/store/mongo/MongoCnxnManager.java | 31 ++++----
.../drill/exec/store/mongo/MongoGroupScan.java | 27 ++++---
.../exec/store/mongo/MongoRecordReader.java | 8 ++-
.../exec/store/mongo/MongoScanBatchCreator.java | 5 +-
.../store/mongo/MongoStoragePluginConfig.java | 15 +++-
.../store/mongo/schema/MongoSchemaFactory.java | 38 +++++++---
7 files changed, 159 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
new file mode 100644
index 0000000..5323d1b
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnKey.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.ServerAddress;
+
+public class MongoCnxnKey {
+
+ private ServerAddress address;
+ private String user;
+
+ public MongoCnxnKey(ServerAddress address, String user) {
+ this.address = address;
+ this.user = user;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((address == null) ? 0 : address.hashCode());
+ result = prime * result + ((user == null) ? 0 : user.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MongoCnxnKey other = (MongoCnxnKey) obj;
+ if (address == null) {
+ if (other.address != null) {
+ return false;
+ }
+ } else if (!address.equals(other.address)) {
+ return false;
+ }
+ if (user == null) {
+ if (other.user != null) {
+ return false;
+ }
+ } else if (!user.equals(other.user)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "[address=" + address.toString() + ", user=" + user + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
index a4482dd..35cc265 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.mongo;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,13 +31,14 @@ import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
public class MongoCnxnManager {
private static final Logger logger = LoggerFactory
.getLogger(MongoCnxnManager.class);
- private static Cache<ServerAddress, MongoClient> addressClientMap;
+ private static Cache<MongoCnxnKey, MongoClient> addressClientMap;
static {
addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
@@ -45,28 +47,33 @@ public class MongoCnxnManager {
}
private static class AddressCloser implements
- RemovalListener<ServerAddress, MongoClient> {
+ RemovalListener<MongoCnxnKey, MongoClient> {
@Override
public synchronized void onRemoval(
- RemovalNotification<ServerAddress, MongoClient> removal) {
+ RemovalNotification<MongoCnxnKey, MongoClient> removal) {
removal.getValue().close();
- ;
logger.debug("Closed connection to {}.", removal.getKey().toString());
}
}
public synchronized static MongoClient getClient(
- List<ServerAddress> addresses, MongoClientOptions clientOptions)
- throws UnknownHostException {
+ List<ServerAddress> addresses, MongoClientOptions clientOptions,
+ MongoCredential credential) throws UnknownHostException {
// Take the first replica from the replicated servers
ServerAddress serverAddress = addresses.get(0);
- MongoClient client = addressClientMap.getIfPresent(serverAddress);
+ String userName = credential == null ? null : credential.getUserName();
+ MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
+ MongoClient client = addressClientMap.getIfPresent(key);
if (client == null) {
- client = new MongoClient(addresses, clientOptions);
- addressClientMap.put(serverAddress, client);
- logger.debug("Created connection to {}.", serverAddress.toString());
- logger.debug("Number of connections opened are {}.",
- addressClientMap.size());
+ if (credential != null) {
+ List<MongoCredential> credentialList = Arrays.asList(credential);
+ client = new MongoClient(addresses, credentialList, clientOptions);
+ } else {
+ client = new MongoClient(addresses, clientOptions);
+ }
+ addressClientMap.put(key, client);
+ logger.debug("Created connection to {}.", key.toString());
+ logger.debug("Number of open connections {}.", addressClientMap.size());
}
return client;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 54d34f9..e33d2ae 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -165,19 +165,22 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
private boolean isShardedCluster(MongoClient client) {
- // need to check better way of identifying
- List<String> databaseNames = client.getDatabaseNames();
- return databaseNames.contains(CONFIG);
+ DB db = client.getDB(scanSpec.getDbName());
+ String msg = db.command("isMaster").getString("msg");
+ return msg == null ? false : msg.equals("isdbgrid");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void init() throws IOException {
- MongoClient client = null;
try {
- MongoClientURI clientURI = new MongoClientURI(
- this.storagePluginConfig.getConnection());
- client = new MongoClient(clientURI);
-
+ List<String> h = storagePluginConfig.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : h) {
+ addresses.add(new ServerAddress(host));
+ }
+ MongoClient client = MongoCnxnManager.getClient(addresses,
+ storagePluginConfig.getMongoOptions(),
+ storagePluginConfig.getMongoCrendials());
chunksMapping = Maps.newHashMap();
chunksInverseMapping = Maps.newLinkedHashMap();
if (isShardedCluster(client)) {
@@ -268,7 +271,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
} else {
String chunkName = scanSpec.getDbName() + "."
+ scanSpec.getCollectionName();
- List<String> hosts = clientURI.getHosts();
+ List<String> hosts = storagePluginConfig.getHosts();
Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
if (addressList == null) {
addressList = Sets.newHashSet();
@@ -289,10 +292,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
} catch (UnknownHostException e) {
throw new DrillRuntimeException(e.getMessage(), e);
- } finally {
- if (client != null) {
- client.close();
- }
}
}
@@ -475,7 +474,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
addresses.add(new ServerAddress(host));
}
MongoClient client = MongoCnxnManager.getClient(addresses,
- clientURI.getOptions());
+ clientURI.getOptions(), clientURI.getCredentials());
DB db = client.getDB(scanSpec.getDbName());
DBCollection collection = db.getCollectionFromString(scanSpec
.getCollectionName());
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 0b263df..3c4472c 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -51,6 +51,7 @@ import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
@@ -67,6 +68,7 @@ public class MongoRecordReader extends AbstractRecordReader {
private DBObject fields;
private MongoClientOptions clientOptions;
+ private MongoCredential credential;
private FragmentContext fragmentContext;
private OperatorContext operatorContext;
@@ -74,8 +76,9 @@ public class MongoRecordReader extends AbstractRecordReader {
public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context,
- MongoClientOptions clientOptions) {
+ MongoClientOptions clientOptions, MongoCredential credential) {
this.clientOptions = clientOptions;
+ this.credential = credential;
this.fields = new BasicDBObject();
// exclude _id field, if not mentioned by user.
this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
@@ -134,7 +137,8 @@ public class MongoRecordReader extends AbstractRecordReader {
for (String host : hosts) {
addresses.add(new ServerAddress(host));
}
- MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
+ MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
+ credential);
DB db = client.getDB(subScanSpec.getDbName());
collection = db.getCollection(subScanSpec.getCollectionName());
} catch (UnknownHostException e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 8e5fd7d..c4597b5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoCredential;
public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
static final Logger logger = LoggerFactory
@@ -52,8 +53,10 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
}
MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
.getMongoOptions();
+ MongoCredential mongoCrendials = subScan.getMongoPluginConfig()
+ .getMongoCrendials();
readers.add(new MongoRecordReader(scanSpec, columns, context,
- clientOptions));
+ clientOptions, mongoCrendials));
} catch (Exception e) {
logger.error("MongoRecordReader creation failed for subScan: "
+ subScan + ".");
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index b7cbf24..b39e7b9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.mongo;
+import java.util.List;
+
import org.apache.drill.common.logical.StoragePluginConfig;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -25,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
@JsonTypeName(MongoStoragePluginConfig.NAME)
public class MongoStoragePluginConfig extends StoragePluginConfig {
@@ -41,6 +44,7 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
@JsonCreator
public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
this.connection = connection;
+ this.clientURI = new MongoClientURI(connection);
}
@Override
@@ -61,11 +65,20 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
}
@JsonIgnore
+ public MongoCredential getMongoCrendials() {
+ return clientURI.getCredentials();
+ }
+
+ @JsonIgnore
public MongoClientOptions getMongoOptions() {
- MongoClientURI clientURI = new MongoClientURI(connection);
return clientURI.getOptions();
}
+ @JsonIgnore
+ public List<String> getHosts() {
+ return clientURI.getHosts();
+ }
+
public String getConnection() {
return connection;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f5b0f492/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index c92a35d..fccffb5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -52,7 +53,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mongodb.DB;
import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
+import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
@@ -70,21 +72,20 @@ public class MongoSchemaFactory implements SchemaFactory {
private final List<ServerAddress> addresses;
private final MongoClientOptions options;
+ private final MongoCredential credential;
public MongoSchemaFactory(MongoStoragePlugin schema, String schemaName)
throws ExecutionSetupException, UnknownHostException {
- String connection = schema.getConfig().getConnection();
-
this.plugin = schema;
this.schemaName = schemaName;
- MongoClientURI clientURI = new MongoClientURI(connection);
- List<String> hosts = clientURI.getHosts();
+ List<String> hosts = plugin.getConfig().getHosts();
addresses = Lists.newArrayList();
for (String host : hosts) {
addresses.add(new ServerAddress(host));
}
- options = clientURI.getOptions();
+ options = plugin.getConfig().getMongoOptions();
+ credential = plugin.getConfig().getMongoCrendials();
databases = CacheBuilder //
.newBuilder() //
@@ -104,7 +105,16 @@ public class MongoSchemaFactory implements SchemaFactory {
if (!DATABASES.equals(key)) {
throw new UnsupportedOperationException();
}
- return MongoCnxnManager.getClient(addresses, options).getDatabaseNames();
+ try {
+ return MongoCnxnManager.getClient(addresses, options, credential)
+ .getDatabaseNames();
+ } catch (MongoException me) {
+ logger.warn("Failure while loading databases in Mongo. {}",
+ me.getMessage());
+ return Collections.emptyList();
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
}
}
@@ -113,9 +123,17 @@ public class MongoSchemaFactory implements SchemaFactory {
@Override
public List<String> load(String dbName) throws Exception {
- DB db = MongoCnxnManager.getClient(addresses, options).getDB(dbName);
- Set<String> collectionNames = db.getCollectionNames();
- return new ArrayList<>(collectionNames);
+ try {
+ DB db = MongoCnxnManager.getClient(addresses, options, credential)
+ .getDB(dbName);
+ return new ArrayList<>(db.getCollectionNames());
+ } catch (MongoException me) {
+ logger.warn("Failure while getting collection names from '{}'. {}",
+ dbName, me.getMessage());
+ return Collections.emptyList();
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
}
}