You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/06/26 05:31:02 UTC
[1/3] drill git commit: DRILL-2592: Make jdbc-all dependencies
non-transitive (provided scope)
Repository: drill
Updated Branches:
refs/heads/master c7c223662 -> 6503cfbff
DRILL-2592: Make jdbc-all dependencies non-transitive (provided scope)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6503cfbf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6503cfbf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6503cfbf
Branch: refs/heads/master
Commit: 6503cfbff7d52f5e409ac7e73647871a8ebec79f
Parents: 453b363
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Jun 25 10:24:32 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700
----------------------------------------------------------------------
exec/jdbc-all/pom.xml | 99 ++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6503cfbf/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index d3525f7..3052e40 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -28,15 +28,99 @@
<name>JDBC JAR with all dependencies</name>
<dependencies>
+
+ <!-- start parent dependencies -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>4.0.27.Final</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${dep.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${dep.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${dep.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${dep.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.googlecode.jmockit</groupId>
+ <artifactId>jmockit</artifactId>
+ <version>1.3</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${dep.junit.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.9.5</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>de.huxhorn.lilith</groupId>
+ <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
+ <version>0.9.44</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- end parent dependencies -->
+
<dependency>
<groupId>net.hydromatic</groupId>
- <artifactId>optiq-avatica</artifactId>
- <version>0.9-drill-r20</version>
+ <artifactId>optiq-avatica</artifactId>
+ <version>0.9-drill-r20</version>
+ <scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
@@ -124,6 +208,7 @@
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>javassist</artifactId>
@@ -135,6 +220,7 @@
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-jdbc</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>drill-storage-hive-core</artifactId>
@@ -151,11 +237,13 @@
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.6.1</version>
+ <scope>provided</scope>
</dependency>
<!-- Specify xalan and xerces versions to avoid setXIncludeAware error. -->
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
@@ -166,6 +254,7 @@
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
@@ -177,11 +266,13 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
@@ -242,8 +333,8 @@
</options>
<exclusions>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
<libs>
[3/3] drill git commit: DRILL-3177: Part 1,
upgrading Mongo Java Driver to 3.0.1
Posted by ja...@apache.org.
DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5bb75b2d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5bb75b2d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5bb75b2d
Branch: refs/heads/master
Commit: 5bb75b2d1fc23586aff40ebfd93a7ce2084d01ad
Parents: c7c2236
Author: Kamesh <ka...@gmail.com>
Authored: Sat May 23 07:38:26 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700
----------------------------------------------------------------------
contrib/storage-mongo/pom.xml | 2 +-
.../drill/exec/store/mongo/MongoGroupScan.java | 76 ++++++++++----------
.../exec/store/mongo/MongoRecordReader.java | 21 +++---
.../drill/exec/store/mongo/MongoUtils.java | 12 ----
.../store/mongo/schema/MongoSchemaFactory.java | 21 +++---
5 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 8168861..ce4ad7e 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -38,7 +38,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
- <version>2.12.2</version>
+ <version>3.0.1</version>
<scope>compile</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index e33d2ae..b7885d3 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
import org.apache.drill.exec.store.mongo.common.ChunkInfo;
+import org.bson.Document;
import org.bson.types.MaxKey;
import org.bson.types.MinKey;
import org.slf4j.Logger;
@@ -65,15 +66,15 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject;
-import com.mongodb.CommandResult;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
@JsonTypeName("mongo-scan")
public class MongoGroupScan extends AbstractGroupScan implements
@@ -165,12 +166,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
private boolean isShardedCluster(MongoClient client) {
- DB db = client.getDB(scanSpec.getDbName());
- String msg = db.command("isMaster").getString("msg");
+ MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+ String msg = db.runCommand(new Document("isMaster", 1)).getString("msg");
return msg == null ? false : msg.equals("isdbgrid");
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
private void init() throws IOException {
try {
List<String> h = storagePluginConfig.getHosts();
@@ -184,35 +185,37 @@ public class MongoGroupScan extends AbstractGroupScan implements
chunksMapping = Maps.newHashMap();
chunksInverseMapping = Maps.newLinkedHashMap();
if (isShardedCluster(client)) {
- DB db = client.getDB(CONFIG);
- DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
- DBObject query = new BasicDBObject(1);
- query
+ MongoDatabase db = client.getDatabase(CONFIG);
+ MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
+ Document filter = new Document();
+ filter
.put(
NS,
this.scanSpec.getDbName() + "."
+ this.scanSpec.getCollectionName());
- DBObject fields = new BasicDBObject();
- fields.put(SHARD, select);
- fields.put(MIN, select);
- fields.put(MAX, select);
+ Document projection = new Document();
+ projection.put(SHARD, select);
+ projection.put(MIN, select);
+ projection.put(MAX, select);
- DBCursor chunkCursor = chunksCollection.find(query, fields);
+ FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
+ MongoCursor<Document> iterator = chunkCursor.iterator();
- DBCollection shardsCollection = db.getCollectionFromString(SHARDS);
+ MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
- fields = new BasicDBObject();
- fields.put(HOST, select);
+ projection = new Document();
+ projection.put(HOST, select);
- while (chunkCursor.hasNext()) {
- DBObject chunkObj = chunkCursor.next();
+ while (iterator.hasNext()) {
+ Document chunkObj = iterator.next();
String shardName = (String) chunkObj.get(SHARD);
String chunkId = (String) chunkObj.get(ID);
- query = new BasicDBObject().append(ID, shardName);
- DBCursor hostCursor = shardsCollection.find(query, fields);
- while (hostCursor.hasNext()) {
- DBObject hostObj = hostCursor.next();
+ filter = new Document(ID, shardName);
+ FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
+ MongoCursor<Document> hostIterator = hostCursor.iterator();
+ while (hostIterator.hasNext()) {
+ Document hostObj = hostIterator.next();
String hostEntry = (String) hostObj.get(HOST);
String[] tagAndHost = StringUtils.split(hostEntry, '/');
String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
@@ -300,27 +303,27 @@ public class MongoGroupScan extends AbstractGroupScan implements
private Set<ServerAddress> getPreferredHosts(MongoClient client,
List<String> hosts) throws UnknownHostException {
Set<ServerAddress> addressList = Sets.newHashSet();
- DB db = client.getDB(scanSpec.getDbName());
+ MongoDatabase db = client.getDatabase(scanSpec.getDbName());
ReadPreference readPreference = client.getReadPreference();
+ Document command = db.runCommand(new Document("isMaster", 1));
+
switch (readPreference.getName().toUpperCase()) {
case "PRIMARY":
case "PRIMARYPREFERRED":
- String primaryHost = db.command("isMaster").getString("primary");
+ String primaryHost = command.getString("primary");
addressList.add(new ServerAddress(primaryHost));
return addressList;
case "SECONDARY":
case "SECONDARYPREFERRED":
- primaryHost = db.command("isMaster").getString("primary");
- @SuppressWarnings("unchecked")
- List<String> hostsList = (List<String>) db.command("isMaster").get(
- "hosts");
+ primaryHost = command.getString("primary");
+ List<String> hostsList = (List<String>) command.get("hosts");
hostsList.remove(primaryHost);
for (String host : hostsList) {
addressList.add(new ServerAddress(host));
}
return addressList;
case "NEAREST":
- hostsList = (List<String>) db.command("isMaster").get("hosts");
+ hostsList = (List<String>) command.get("hosts");
for (String host : hostsList) {
addressList.add(new ServerAddress(host));
}
@@ -475,12 +478,13 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
MongoClient client = MongoCnxnManager.getClient(addresses,
clientURI.getOptions(), clientURI.getCredentials());
- DB db = client.getDB(scanSpec.getDbName());
- DBCollection collection = db.getCollectionFromString(scanSpec
+ MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+ MongoCollection<Document> collection = db.getCollection(scanSpec
.getCollectionName());
- CommandResult stats = collection.getStats();
+ String json = collection.find().first().toJson();
+ float approxDiskCost = json.getBytes().length * collection.count();
return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
- stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE));
+ collection.count(), 1, approxDiskCost);
} catch (Exception e) {
throw new DrillRuntimeException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 182f5a4..40fc810 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,27 +46,25 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
-import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
public class MongoRecordReader extends AbstractRecordReader {
static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
- private DBCollection collection;
- private DBCursor cursor;
+ private MongoCollection<Document> collection;
+ private MongoCursor<Document> cursor;
private JsonReader jsonReader;
private VectorContainerWriter writer;
private BasicDBObject filters;
- private DBObject fields;
+ private BasicDBObject fields;
private MongoClientOptions clientOptions;
private MongoCredential credential;
@@ -141,7 +140,7 @@ public class MongoRecordReader extends AbstractRecordReader {
}
MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
credential);
- DB db = client.getDB(subScanSpec.getDbName());
+ MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
collection = db.getCollection(subScanSpec.getCollectionName());
} catch (UnknownHostException e) {
throw new DrillRuntimeException(e.getMessage(), e);
@@ -155,7 +154,7 @@ public class MongoRecordReader extends AbstractRecordReader {
this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
logger.info("Filters Applied : " + filters);
logger.info("Fields Selected :" + fields);
- cursor = collection.find(filters, fields);
+ cursor = collection.find(filters).projection(fields).iterator();
}
@Override
@@ -170,7 +169,7 @@ public class MongoRecordReader extends AbstractRecordReader {
try {
while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext()) {
writer.setPosition(docCount);
- String doc = cursor.next().toString();
+ String doc = cursor.next().toJson();
jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
jsonReader.write(writer);
docCount++;
http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
index b43a22f..a5fb2ad 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
@@ -22,13 +22,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.bson.LazyBSONCallback;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
-import com.mongodb.LazyWriteableDBObject;
public class MongoUtils {
@@ -52,14 +48,6 @@ public class MongoUtils {
return orQueryFilter;
}
- public static BasicDBObject deserializeFilter(byte[] filterBytes) {
- DBObject dbo = new LazyWriteableDBObject(filterBytes,
- new LazyBSONCallback());
- BasicDBObject result = new BasicDBObject();
- result.putAll(dbo);
- return result;
- }
-
public static Map<String, List<BasicDBObject>> mergeFilters(
Map<String, Object> minFilters, Map<String, Object> maxFilters) {
Map<String, List<BasicDBObject>> filters = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index fccffb5..d453fb9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -27,10 +27,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Maps;
-import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
-
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
@@ -50,13 +47,13 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.mongodb.DB;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.MongoException;
-import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoDatabase;
public class MongoSchemaFactory implements SchemaFactory {
@@ -106,8 +103,10 @@ public class MongoSchemaFactory implements SchemaFactory {
throw new UnsupportedOperationException();
}
try {
- return MongoCnxnManager.getClient(addresses, options, credential)
- .getDatabaseNames();
+ List<String> dbNames = new ArrayList<>();
+ MongoCnxnManager.getClient(addresses, options, credential)
+ .listDatabaseNames().into(dbNames);
+ return dbNames;
} catch (MongoException me) {
logger.warn("Failure while loading databases in Mongo. {}",
me.getMessage());
@@ -124,9 +123,11 @@ public class MongoSchemaFactory implements SchemaFactory {
@Override
public List<String> load(String dbName) throws Exception {
try {
- DB db = MongoCnxnManager.getClient(addresses, options, credential)
- .getDB(dbName);
- return new ArrayList<>(db.getCollectionNames());
+ MongoDatabase db = MongoCnxnManager.getClient(addresses, options,
+ credential).getDatabase(dbName);
+ List<String> collectionNames = new ArrayList<>();
+ db.listCollectionNames().into(collectionNames);
+ return collectionNames;
} catch (MongoException me) {
logger.warn("Failure while getting collection names from '{}'. {}",
dbName, me.getMessage());
[2/3] drill git commit: DRILL-3177: Part 2,
various Mongo plugin enhancements
Posted by ja...@apache.org.
DRILL-3177: Part 2, various Mongo plugin enhancements
- Fix issues with initial Mongo 3 update patch
- Update mongo connection management so we don't generate infinite MongoClients.
- Move from static class to management inside Storage Plugin.
- Increase limit on number of connections.
- move mongo cursor initialization to batch reading, so a query won't block indefinitely on this code
- Update mongo driver version to 3.0.2
- Update host access to use all hosts rather than just master
- Remove references to no longer used UnknownHostException
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/453b363a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/453b363a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/453b363a
Branch: refs/heads/master
Commit: 453b363a7e3c97a2d57984c5e9553413c2d05c6a
Parents: 5bb75b2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Jun 25 19:56:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700
----------------------------------------------------------------------
contrib/storage-mongo/pom.xml | 14 +-
.../exec/store/mongo/MongoCnxnManager.java | 80 ------
.../drill/exec/store/mongo/MongoGroupScan.java | 256 +++++++++----------
.../exec/store/mongo/MongoRecordReader.java | 68 +++--
.../exec/store/mongo/MongoScanBatchCreator.java | 7 +-
.../exec/store/mongo/MongoStoragePlugin.java | 70 ++++-
.../store/mongo/schema/MongoSchemaFactory.java | 32 +--
7 files changed, 235 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index ce4ad7e..2d7ad2d 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -34,13 +34,13 @@
<artifactId>drill-java-exec</artifactId>
<version>${project.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.0.1</version>
- <scope>compile</scope>
- </dependency>
+
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>3.0.2</version>
+ </dependency>
<!-- Test dependencie -->
<dependency>
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
deleted file mode 100644
index 35cc265..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo;
-
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
-import com.mongodb.ServerAddress;
-
-public class MongoCnxnManager {
-
- private static final Logger logger = LoggerFactory
- .getLogger(MongoCnxnManager.class);
- private static Cache<MongoCnxnKey, MongoClient> addressClientMap;
-
- static {
- addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .removalListener(new AddressCloser()).build();
- }
-
- private static class AddressCloser implements
- RemovalListener<MongoCnxnKey, MongoClient> {
- @Override
- public synchronized void onRemoval(
- RemovalNotification<MongoCnxnKey, MongoClient> removal) {
- removal.getValue().close();
- logger.debug("Closed connection to {}.", removal.getKey().toString());
- }
- }
-
- public synchronized static MongoClient getClient(
- List<ServerAddress> addresses, MongoClientOptions clientOptions,
- MongoCredential credential) throws UnknownHostException {
- // Take the first replica from the replicated servers
- ServerAddress serverAddress = addresses.get(0);
- String userName = credential == null ? null : credential.getUserName();
- MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
- MongoClient client = addressClientMap.getIfPresent(key);
- if (client == null) {
- if (credential != null) {
- List<MongoCredential> credentialList = Arrays.asList(credential);
- client = new MongoClient(addresses, credentialList, clientOptions);
- } else {
- client = new MongoClient(addresses, clientOptions);
- }
- addressClientMap.put(key, client);
- logger.debug("Created connection to {}.", key.toString());
- logger.debug("Number of open connections {}.", addressClientMap.size());
- }
- return client;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b7885d3..6bf4d92 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.mongo;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,10 +64,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
@@ -173,157 +169,155 @@ public class MongoGroupScan extends AbstractGroupScan implements
@SuppressWarnings({ "rawtypes" })
private void init() throws IOException {
- try {
- List<String> h = storagePluginConfig.getHosts();
- List<ServerAddress> addresses = Lists.newArrayList();
- for (String host : h) {
- addresses.add(new ServerAddress(host));
- }
- MongoClient client = MongoCnxnManager.getClient(addresses,
- storagePluginConfig.getMongoOptions(),
- storagePluginConfig.getMongoCrendials());
- chunksMapping = Maps.newHashMap();
- chunksInverseMapping = Maps.newLinkedHashMap();
- if (isShardedCluster(client)) {
- MongoDatabase db = client.getDatabase(CONFIG);
- MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
- Document filter = new Document();
- filter
- .put(
- NS,
- this.scanSpec.getDbName() + "."
- + this.scanSpec.getCollectionName());
-
- Document projection = new Document();
- projection.put(SHARD, select);
- projection.put(MIN, select);
- projection.put(MAX, select);
-
- FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
- MongoCursor<Document> iterator = chunkCursor.iterator();
-
- MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
-
- projection = new Document();
- projection.put(HOST, select);
-
- while (iterator.hasNext()) {
- Document chunkObj = iterator.next();
- String shardName = (String) chunkObj.get(SHARD);
- String chunkId = (String) chunkObj.get(ID);
- filter = new Document(ID, shardName);
- FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
- MongoCursor<Document> hostIterator = hostCursor.iterator();
- while (hostIterator.hasNext()) {
- Document hostObj = hostIterator.next();
- String hostEntry = (String) hostObj.get(HOST);
- String[] tagAndHost = StringUtils.split(hostEntry, '/');
- String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
- tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
- List<String> chunkHosts = Arrays.asList(hosts);
- //to get the address list from one of the shard nodes, need to get port.
- MongoClient shardClient = new MongoClient(hosts[0]);
- Set<ServerAddress> addressList = getPreferredHosts(shardClient, chunkHosts);
- if (addressList == null) {
- addressList = Sets.newHashSet();
- for (String host : chunkHosts) {
- addressList.add(new ServerAddress(host));
- }
- }
- chunksMapping.put(chunkId, addressList);
- ServerAddress address = addressList.iterator().next();
- List<ChunkInfo> chunkList = chunksInverseMapping.get(address
- .getHost());
- if (chunkList == null) {
- chunkList = Lists.newArrayList();
- chunksInverseMapping.put(address.getHost(), chunkList);
- }
- List<String> chunkHostsList = new ArrayList<String>();
- for(ServerAddress serverAddr : addressList){
- chunkHostsList.add(serverAddr.toString());
+
+ List<String> h = storagePluginConfig.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : h) {
+ addresses.add(new ServerAddress(host));
+ }
+ MongoClient client = storagePlugin.getClient();
+ chunksMapping = Maps.newHashMap();
+ chunksInverseMapping = Maps.newLinkedHashMap();
+ if (isShardedCluster(client)) {
+ MongoDatabase db = client.getDatabase(CONFIG);
+ MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
+ Document filter = new Document();
+ filter
+ .put(
+ NS,
+ this.scanSpec.getDbName() + "."
+ + this.scanSpec.getCollectionName());
+
+ Document projection = new Document();
+ projection.put(SHARD, select);
+ projection.put(MIN, select);
+ projection.put(MAX, select);
+
+ FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
+ MongoCursor<Document> iterator = chunkCursor.iterator();
+
+ MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
+
+ projection = new Document();
+ projection.put(HOST, select);
+
+ while (iterator.hasNext()) {
+ Document chunkObj = iterator.next();
+ String shardName = (String) chunkObj.get(SHARD);
+ String chunkId = (String) chunkObj.get(ID);
+ filter = new Document(ID, shardName);
+ FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
+ MongoCursor<Document> hostIterator = hostCursor.iterator();
+ while (hostIterator.hasNext()) {
+ Document hostObj = hostIterator.next();
+ String hostEntry = (String) hostObj.get(HOST);
+ String[] tagAndHost = StringUtils.split(hostEntry, '/');
+ String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
+ tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
+ List<String> chunkHosts = Arrays.asList(hosts);
+ Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses), chunkHosts);
+ if (addressList == null) {
+ addressList = Sets.newHashSet();
+ for (String host : chunkHosts) {
+ addressList.add(new ServerAddress(host));
}
- ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
- DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
-
- Map<String, Object> minFilters = Maps.newHashMap();
- Map minMap = minObj.toMap();
- Set keySet = minMap.keySet();
- for (Object keyObj : keySet) {
- Object object = minMap.get(keyObj);
- if (!(object instanceof MinKey)) {
- minFilters.put(keyObj.toString(), object);
- }
+ }
+ chunksMapping.put(chunkId, addressList);
+ ServerAddress address = addressList.iterator().next();
+ List<ChunkInfo> chunkList = chunksInverseMapping.get(address
+ .getHost());
+ if (chunkList == null) {
+ chunkList = Lists.newArrayList();
+ chunksInverseMapping.put(address.getHost(), chunkList);
+ }
+ List<String> chunkHostsList = new ArrayList<String>();
+ for (ServerAddress serverAddr : addressList) {
+ chunkHostsList.add(serverAddr.toString());
+ }
+ ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
+ Document minMap = (Document) chunkObj.get(MIN);
+
+ Map<String, Object> minFilters = Maps.newHashMap();
+ Set keySet = minMap.keySet();
+ for (Object keyObj : keySet) {
+ Object object = minMap.get(keyObj);
+ if (!(object instanceof MinKey)) {
+ minFilters.put(keyObj.toString(), object);
}
- chunkInfo.setMinFilters(minFilters);
-
- DBObject maxObj = (BasicDBObject) chunkObj.get(MAX);
- Map<String, Object> maxFilters = Maps.newHashMap();
- Map maxMap = maxObj.toMap();
- keySet = maxMap.keySet();
- for (Object keyObj : keySet) {
- Object object = maxMap.get(keyObj);
- if (!(object instanceof MaxKey)) {
- maxFilters.put(keyObj.toString(), object);
- }
+ }
+ chunkInfo.setMinFilters(minFilters);
+
+ Map<String, Object> maxFilters = Maps.newHashMap();
+ Map maxMap = (Document) chunkObj.get(MAX);
+ keySet = maxMap.keySet();
+ for (Object keyObj : keySet) {
+ Object object = maxMap.get(keyObj);
+ if (!(object instanceof MaxKey)) {
+ maxFilters.put(keyObj.toString(), object);
}
-
- chunkInfo.setMaxFilters(maxFilters);
- chunkList.add(chunkInfo);
}
+
+ chunkInfo.setMaxFilters(maxFilters);
+ chunkList.add(chunkInfo);
}
- } else {
- String chunkName = scanSpec.getDbName() + "."
- + scanSpec.getCollectionName();
- List<String> hosts = storagePluginConfig.getHosts();
- Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
- if (addressList == null) {
- addressList = Sets.newHashSet();
- for (String host : hosts) {
- addressList.add(new ServerAddress(host));
- }
+ }
+ } else {
+ String chunkName = scanSpec.getDbName() + "."
+ + scanSpec.getCollectionName();
+ List<String> hosts = storagePluginConfig.getHosts();
+ Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
+ if (addressList == null) {
+ addressList = Sets.newHashSet();
+ for (String host : hosts) {
+ addressList.add(new ServerAddress(host));
}
- chunksMapping.put(chunkName, addressList);
-
- String host = hosts.get(0);
- ServerAddress address = new ServerAddress(host);
- ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
- chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
- chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
- List<ChunkInfo> chunksList = Lists.newArrayList();
- chunksList.add(chunkInfo);
- chunksInverseMapping.put(address.getHost(), chunksList);
}
- } catch (UnknownHostException e) {
- throw new DrillRuntimeException(e.getMessage(), e);
+ chunksMapping.put(chunkName, addressList);
+
+ String host = hosts.get(0);
+ ServerAddress address = new ServerAddress(host);
+ ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
+ chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
+ chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+ List<ChunkInfo> chunksList = Lists.newArrayList();
+ chunksList.add(chunkInfo);
+ chunksInverseMapping.put(address.getHost(), chunksList);
}
}
@SuppressWarnings("unchecked")
- private Set<ServerAddress> getPreferredHosts(MongoClient client,
- List<String> hosts) throws UnknownHostException {
+ private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) {
Set<ServerAddress> addressList = Sets.newHashSet();
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
ReadPreference readPreference = client.getReadPreference();
Document command = db.runCommand(new Document("isMaster", 1));
+ final String primaryHost = command.getString("primary");
+ final List<String> hostsList = (List<String>) command.get("hosts");
+
switch (readPreference.getName().toUpperCase()) {
case "PRIMARY":
case "PRIMARYPREFERRED":
- String primaryHost = command.getString("primary");
+ if (primaryHost == null) {
+ return null;
+ }
addressList.add(new ServerAddress(primaryHost));
return addressList;
case "SECONDARY":
case "SECONDARYPREFERRED":
- primaryHost = command.getString("primary");
- List<String> hostsList = (List<String>) command.get("hosts");
+ if (primaryHost == null || hostsList == null) {
+ return null;
+ }
hostsList.remove(primaryHost);
for (String host : hostsList) {
addressList.add(new ServerAddress(host));
}
return addressList;
case "NEAREST":
- hostsList = (List<String>) command.get("hosts");
+ if (hostsList == null) {
+ return null;
+ }
for (String host : hostsList) {
addressList.add(new ServerAddress(host));
}
@@ -468,16 +462,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public ScanStats getScanStats() {
- MongoClientURI clientURI = new MongoClientURI(
- this.storagePluginConfig.getConnection());
- try {
- List<String> hosts = clientURI.getHosts();
- List<ServerAddress> addresses = Lists.newArrayList();
- for (String host : hosts) {
- addresses.add(new ServerAddress(host));
- }
- MongoClient client = MongoCnxnManager.getClient(addresses,
- clientURI.getOptions(), clientURI.getCredentials());
+ try{
+ MongoClient client = storagePlugin.getClient();
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
MongoCollection<Document> collection = db.getCollection(scanSpec
.getCollectionName());
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 40fc810..0ac519f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.mongo;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -47,8 +46,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@@ -64,27 +61,29 @@ public class MongoRecordReader extends AbstractRecordReader {
private VectorContainerWriter writer;
private BasicDBObject filters;
- private BasicDBObject fields;
+ private final BasicDBObject fields;
- private MongoClientOptions clientOptions;
- private MongoCredential credential;
- private FragmentContext fragmentContext;
+ private final FragmentContext fragmentContext;
private OperatorContext operatorContext;
- private Boolean enableAllTextMode;
- private Boolean readNumbersAsDouble;
+ private final MongoStoragePlugin plugin;
- public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
- List<SchemaPath> projectedColumns, FragmentContext context,
- MongoClientOptions clientOptions, MongoCredential credential) {
- this.clientOptions = clientOptions;
- this.credential = credential;
- this.fields = new BasicDBObject();
+ private final boolean enableAllTextMode;
+ private final boolean readNumbersAsDouble;
+
+ public MongoRecordReader(
+ MongoSubScan.MongoSubScanSpec subScanSpec,
+ List<SchemaPath> projectedColumns,
+ FragmentContext context,
+ MongoStoragePlugin plugin) {
+
+ fields = new BasicDBObject();
// exclude _id field, if not mentioned by user.
- this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+ fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
setColumns(projectedColumns);
- this.fragmentContext = context;
- this.filters = new BasicDBObject();
+ fragmentContext = context;
+ this.plugin = plugin;
+ filters = new BasicDBObject();
Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
buildFilters(subScanSpec.getFilter(), mergedFilters);
@@ -94,8 +93,7 @@ public class MongoRecordReader extends AbstractRecordReader {
}
@Override
- protected Collection<SchemaPath> transformColumns(
- Collection<SchemaPath> projectedColumns) {
+ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
if (!isStarQuery()) {
for (SchemaPath column : projectedColumns ) {
@@ -132,19 +130,14 @@ public class MongoRecordReader extends AbstractRecordReader {
}
private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
- try {
- List<String> hosts = subScanSpec.getHosts();
- List<ServerAddress> addresses = Lists.newArrayList();
- for (String host : hosts) {
- addresses.add(new ServerAddress(host));
- }
- MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
- credential);
- MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
- collection = db.getCollection(subScanSpec.getCollectionName());
- } catch (UnknownHostException e) {
- throw new DrillRuntimeException(e.getMessage(), e);
+ List<String> hosts = subScanSpec.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : hosts) {
+ addresses.add(new ServerAddress(host));
}
+ MongoClient client = plugin.getClient(addresses);
+ MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
+ collection = db.getCollection(subScanSpec.getCollectionName());
}
@Override
@@ -152,13 +145,18 @@ public class MongoRecordReader extends AbstractRecordReader {
this.operatorContext = context;
this.writer = new VectorContainerWriter(output);
this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
- logger.info("Filters Applied : " + filters);
- logger.info("Fields Selected :" + fields);
- cursor = collection.find(filters).projection(fields).iterator();
+
}
@Override
public int next() {
+ if(cursor == null){
+ logger.info("Filters Applied : " + filters);
+ logger.info("Fields Selected :" + fields);
+ cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+ }
+
+
writer.allocate();
writer.reset();
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 3a8a496..49b1750 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -51,12 +51,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
if ((columns = subScan.getColumns()) == null) {
columns = GroupScan.ALL_COLUMNS;
}
- MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
- .getMongoOptions();
- MongoCredential mongoCrendials = subScan.getMongoPluginConfig()
- .getMongoCrendials();
- readers.add(new MongoRecordReader(scanSpec, columns, context,
- clientOptions, mongoCrendials));
+ readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
} catch (Exception e) {
logger.error("MongoRecordReader creation failed for subScan: "
+ subScan + ".");
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 38bc91d..093df57 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -18,10 +18,13 @@
package org.apache.drill.exec.store.mongo;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.calcite.schema.SchemaPlus;
-
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -35,21 +38,36 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
public class MongoStoragePlugin extends AbstractStoragePlugin {
static final Logger logger = LoggerFactory
.getLogger(MongoStoragePlugin.class);
- private DrillbitContext context;
- private MongoStoragePluginConfig mongoConfig;
- private MongoSchemaFactory schemaFactory;
+ private final DrillbitContext context;
+ private final MongoStoragePluginConfig mongoConfig;
+ private final MongoSchemaFactory schemaFactory;
+ private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
+ private final MongoClientURI clientURI;
public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig,
DrillbitContext context, String name) throws IOException,
ExecutionSetupException {
this.context = context;
this.mongoConfig = mongoConfig;
+ this.clientURI = new MongoClientURI(this.mongoConfig.getConnection());
+ this.addressClientMap = CacheBuilder.newBuilder()
+ .expireAfterAccess(24, TimeUnit.HOURS)
+ .removalListener(new AddressCloser()).build();
this.schemaFactory = new MongoSchemaFactory(this, name);
}
@@ -82,4 +100,48 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
}
+
+ private class AddressCloser implements
+ RemovalListener<MongoCnxnKey, MongoClient> {
+ @Override
+ public synchronized void onRemoval(
+ RemovalNotification<MongoCnxnKey, MongoClient> removal) {
+ removal.getValue().close();
+ logger.debug("Closed connection to {}.", removal.getKey().toString());
+ }
+ }
+
+ public MongoClient getClient(String host) {
+ return getClient(Collections.singletonList(new ServerAddress(host)));
+ }
+
+ public MongoClient getClient() {
+ List<String> hosts = clientURI.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : hosts) {
+ addresses.add(new ServerAddress(host));
+ }
+ return getClient(addresses);
+ }
+
+ public synchronized MongoClient getClient(List<ServerAddress> addresses) {
+ // Take the first replica from the replicated servers
+ final ServerAddress serverAddress = addresses.get(0);
+ final MongoCredential credential = clientURI.getCredentials();
+ String userName = credential == null ? null : credential.getUserName();
+ MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
+ MongoClient client = addressClientMap.getIfPresent(key);
+ if (client == null) {
+ if (credential != null) {
+ List<MongoCredential> credentialList = Arrays.asList(credential);
+ client = new MongoClient(addresses, credentialList, clientURI.getOptions());
+ } else {
+ client = new MongoClient(addresses, clientURI.getOptions());
+ }
+ addressClientMap.put(key, client);
+ logger.debug("Created connection to {}.", key.toString());
+ logger.debug("Number of open connections {}.", addressClientMap.size());
+ }
+ return client;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index d453fb9..9d08abd 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.mongo.schema;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.mongo.MongoCnxnManager;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
@@ -46,13 +44,10 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
+import com.mongodb.MongoClient;
import com.mongodb.MongoException;
-import com.mongodb.ServerAddress;
import com.mongodb.client.MongoDatabase;
public class MongoSchemaFactory implements SchemaFactory {
@@ -66,23 +61,12 @@ public class MongoSchemaFactory implements SchemaFactory {
private LoadingCache<String, List<String>> tableNameLoader;
private final String schemaName;
private final MongoStoragePlugin plugin;
+ private final MongoClient client;
- private final List<ServerAddress> addresses;
- private final MongoClientOptions options;
- private final MongoCredential credential;
-
- public MongoSchemaFactory(MongoStoragePlugin schema, String schemaName)
- throws ExecutionSetupException, UnknownHostException {
- this.plugin = schema;
+ public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+ this.plugin = plugin;
this.schemaName = schemaName;
-
- List<String> hosts = plugin.getConfig().getHosts();
- addresses = Lists.newArrayList();
- for (String host : hosts) {
- addresses.add(new ServerAddress(host));
- }
- options = plugin.getConfig().getMongoOptions();
- credential = plugin.getConfig().getMongoCrendials();
+ this.client = plugin.getClient();
databases = CacheBuilder //
.newBuilder() //
@@ -104,8 +88,7 @@ public class MongoSchemaFactory implements SchemaFactory {
}
try {
List<String> dbNames = new ArrayList<>();
- MongoCnxnManager.getClient(addresses, options, credential)
- .listDatabaseNames().into(dbNames);
+ client.listDatabaseNames().into(dbNames);
return dbNames;
} catch (MongoException me) {
logger.warn("Failure while loading databases in Mongo. {}",
@@ -123,8 +106,7 @@ public class MongoSchemaFactory implements SchemaFactory {
@Override
public List<String> load(String dbName) throws Exception {
try {
- MongoDatabase db = MongoCnxnManager.getClient(addresses, options,
- credential).getDatabase(dbName);
+ MongoDatabase db = client.getDatabase(dbName);
List<String> collectionNames = new ArrayList<>();
db.listCollectionNames().into(collectionNames);
return collectionNames;