You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/06/26 05:31:02 UTC

[1/3] drill git commit: DRILL-2592: Make jdbc-all dependencies non-transitive (provided scope)

Repository: drill
Updated Branches:
  refs/heads/master c7c223662 -> 6503cfbff


DRILL-2592: Make jdbc-all dependencies non-transitive (provided scope)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6503cfbf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6503cfbf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6503cfbf

Branch: refs/heads/master
Commit: 6503cfbff7d52f5e409ac7e73647871a8ebec79f
Parents: 453b363
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Jun 25 10:24:32 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700

----------------------------------------------------------------------
 exec/jdbc-all/pom.xml | 99 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6503cfbf/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index d3525f7..3052e40 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -28,15 +28,99 @@
   <name>JDBC JAR with all dependencies</name>
 
   <dependencies>
+  
+    <!-- start parent dependencies -->
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+      <version>4.0.27.Final</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${dep.slf4j.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <version>${dep.slf4j.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <version>${dep.slf4j.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>log4j-over-slf4j</artifactId>
+      <version>${dep.slf4j.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.googlecode.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <version>1.3</version>
+      <scope>provided</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${dep.junit.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>1.9.5</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <version>1.0.13</version>
+      <scope>provided</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>de.huxhorn.lilith</groupId>
+      <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
+      <version>0.9.44</version>
+      <scope>provided</scope>
+    </dependency>
+        
+    <!-- end parent dependencies -->
+            
     <dependency>
       <groupId>net.hydromatic</groupId>
-        <artifactId>optiq-avatica</artifactId>
-        <version>0.9-drill-r20</version>
+      <artifactId>optiq-avatica</artifactId>
+      <version>0.9-drill-r20</version>
+      <scope>provided</scope>
     </dependency>
+
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <groupId>log4j</groupId>
@@ -124,6 +208,7 @@
       <groupId>org.apache.drill</groupId>
       <artifactId>drill-common</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <artifactId>javassist</artifactId>
@@ -135,6 +220,7 @@
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-jdbc</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <artifactId>drill-storage-hive-core</artifactId>
@@ -151,11 +237,13 @@
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>2.6.1</version>
+      <scope>provided</scope>
     </dependency>
     <!-- Specify xalan and xerces versions to avoid setXIncludeAware error. -->
     <dependency>
       <groupId>xerces</groupId>
       <artifactId>xercesImpl</artifactId>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <groupId>xml-apis</groupId>
@@ -166,6 +254,7 @@
     <dependency>
       <groupId>xalan</groupId>
       <artifactId>xalan</artifactId>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <groupId>xml-apis</groupId>
@@ -177,11 +266,13 @@
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <version>1.0.13</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-core</artifactId>
       <version>1.0.13</version>
+      <scope>provided</scope>
     </dependency>
   </dependencies>
 
@@ -242,8 +333,8 @@
           </options>
           <exclusions>
             <exclusion>
-                <groupId>org.slf4j</groupId>
-                <artifactId>jcl-over-slf4j</artifactId>
+              <groupId>org.slf4j</groupId>
+              <artifactId>jcl-over-slf4j</artifactId>
             </exclusion>
           </exclusions>
           <libs>


[3/3] drill git commit: DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1

Posted by ja...@apache.org.
DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5bb75b2d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5bb75b2d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5bb75b2d

Branch: refs/heads/master
Commit: 5bb75b2d1fc23586aff40ebfd93a7ce2084d01ad
Parents: c7c2236
Author: Kamesh <ka...@gmail.com>
Authored: Sat May 23 07:38:26 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700

----------------------------------------------------------------------
 contrib/storage-mongo/pom.xml                   |  2 +-
 .../drill/exec/store/mongo/MongoGroupScan.java  | 76 ++++++++++----------
 .../exec/store/mongo/MongoRecordReader.java     | 21 +++---
 .../drill/exec/store/mongo/MongoUtils.java      | 12 ----
 .../store/mongo/schema/MongoSchemaFactory.java  | 21 +++---
 5 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 8168861..ce4ad7e 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
 	  <groupId>org.mongodb</groupId>
 	  <artifactId>mongo-java-driver</artifactId>
-	  <version>2.12.2</version>
+	  <version>3.0.1</version>
 	  <scope>compile</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index e33d2ae..b7885d3 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
 import org.apache.drill.exec.store.mongo.common.ChunkInfo;
+import org.bson.Document;
 import org.bson.types.MaxKey;
 import org.bson.types.MinKey;
 import org.slf4j.Logger;
@@ -65,15 +66,15 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
-import com.mongodb.CommandResult;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
 
 @JsonTypeName("mongo-scan")
 public class MongoGroupScan extends AbstractGroupScan implements
@@ -165,12 +166,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   private boolean isShardedCluster(MongoClient client) {
-    DB db = client.getDB(scanSpec.getDbName());
-    String msg = db.command("isMaster").getString("msg");
+    MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+    String msg = db.runCommand(new Document("isMaster", 1)).getString("msg");
     return msg == null ? false : msg.equals("isdbgrid");
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes" })
   private void init() throws IOException {
     try {
       List<String> h = storagePluginConfig.getHosts();
@@ -184,35 +185,37 @@ public class MongoGroupScan extends AbstractGroupScan implements
       chunksMapping = Maps.newHashMap();
       chunksInverseMapping = Maps.newLinkedHashMap();
       if (isShardedCluster(client)) {
-        DB db = client.getDB(CONFIG);
-        DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
-        DBObject query = new BasicDBObject(1);
-        query
+        MongoDatabase db = client.getDatabase(CONFIG);
+        MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
+        Document filter = new Document();
+        filter
             .put(
                 NS,
                 this.scanSpec.getDbName() + "."
                     + this.scanSpec.getCollectionName());
 
-        DBObject fields = new BasicDBObject();
-        fields.put(SHARD, select);
-        fields.put(MIN, select);
-        fields.put(MAX, select);
+        Document projection = new Document();
+        projection.put(SHARD, select);
+        projection.put(MIN, select);
+        projection.put(MAX, select);
 
-        DBCursor chunkCursor = chunksCollection.find(query, fields);
+        FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
+        MongoCursor<Document> iterator = chunkCursor.iterator();
 
-        DBCollection shardsCollection = db.getCollectionFromString(SHARDS);
+        MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
 
-        fields = new BasicDBObject();
-        fields.put(HOST, select);
+        projection = new Document();
+        projection.put(HOST, select);
 
-        while (chunkCursor.hasNext()) {
-          DBObject chunkObj = chunkCursor.next();
+        while (iterator.hasNext()) {
+          Document chunkObj = iterator.next();
           String shardName = (String) chunkObj.get(SHARD);
           String chunkId = (String) chunkObj.get(ID);
-          query = new BasicDBObject().append(ID, shardName);
-          DBCursor hostCursor = shardsCollection.find(query, fields);
-          while (hostCursor.hasNext()) {
-            DBObject hostObj = hostCursor.next();
+          filter = new Document(ID, shardName);
+          FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
+          MongoCursor<Document> hostIterator = hostCursor.iterator();
+          while (hostIterator.hasNext()) {
+            Document hostObj = hostIterator.next();
             String hostEntry = (String) hostObj.get(HOST);
             String[] tagAndHost = StringUtils.split(hostEntry, '/');
             String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
@@ -300,27 +303,27 @@ public class MongoGroupScan extends AbstractGroupScan implements
   private Set<ServerAddress> getPreferredHosts(MongoClient client,
       List<String> hosts) throws UnknownHostException {
     Set<ServerAddress> addressList = Sets.newHashSet();
-    DB db = client.getDB(scanSpec.getDbName());
+    MongoDatabase db = client.getDatabase(scanSpec.getDbName());
     ReadPreference readPreference = client.getReadPreference();
+    Document command = db.runCommand(new Document("isMaster", 1));
+
     switch (readPreference.getName().toUpperCase()) {
     case "PRIMARY":
     case "PRIMARYPREFERRED":
-      String primaryHost = db.command("isMaster").getString("primary");
+      String primaryHost = command.getString("primary");
       addressList.add(new ServerAddress(primaryHost));
       return addressList;
     case "SECONDARY":
     case "SECONDARYPREFERRED":
-      primaryHost = db.command("isMaster").getString("primary");
-      @SuppressWarnings("unchecked")
-      List<String> hostsList = (List<String>) db.command("isMaster").get(
-          "hosts");
+      primaryHost = command.getString("primary");
+      List<String> hostsList = (List<String>) command.get("hosts");
       hostsList.remove(primaryHost);
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
       return addressList;
     case "NEAREST":
-      hostsList = (List<String>) db.command("isMaster").get("hosts");
+      hostsList = (List<String>) command.get("hosts");
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
@@ -475,12 +478,13 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
       MongoClient client = MongoCnxnManager.getClient(addresses,
           clientURI.getOptions(), clientURI.getCredentials());
-      DB db = client.getDB(scanSpec.getDbName());
-      DBCollection collection = db.getCollectionFromString(scanSpec
+      MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+      MongoCollection<Document> collection = db.getCollection(scanSpec
           .getCollectionName());
-      CommandResult stats = collection.getStats();
+      String json = collection.find().first().toJson();
+      float approxDiskCost = json.getBytes().length * collection.count();
       return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
-          stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE));
+          collection.count(), 1, approxDiskCost);
     } catch (Exception e) {
       throw new DrillRuntimeException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 182f5a4..40fc810 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,27 +46,25 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
-import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
 
 public class MongoRecordReader extends AbstractRecordReader {
   static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
 
-  private DBCollection collection;
-  private DBCursor cursor;
+  private MongoCollection<Document> collection;
+  private MongoCursor<Document> cursor;
 
   private JsonReader jsonReader;
   private VectorContainerWriter writer;
 
   private BasicDBObject filters;
-  private DBObject fields;
+  private BasicDBObject fields;
 
   private MongoClientOptions clientOptions;
   private MongoCredential credential;
@@ -141,7 +140,7 @@ public class MongoRecordReader extends AbstractRecordReader {
       }
       MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
           credential);
-      DB db = client.getDB(subScanSpec.getDbName());
+      MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
       collection = db.getCollection(subScanSpec.getCollectionName());
     } catch (UnknownHostException e) {
       throw new DrillRuntimeException(e.getMessage(), e);
@@ -155,7 +154,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
     logger.info("Filters Applied : " + filters);
     logger.info("Fields Selected :" + fields);
-    cursor = collection.find(filters, fields);
+    cursor = collection.find(filters).projection(fields).iterator();
   }
 
   @Override
@@ -170,7 +169,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     try {
       while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext()) {
         writer.setPosition(docCount);
-        String doc = cursor.next().toString();
+        String doc = cursor.next().toJson();
         jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
         jsonReader.write(writer);
         docCount++;

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
index b43a22f..a5fb2ad 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
@@ -22,13 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.bson.LazyBSONCallback;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
-import com.mongodb.LazyWriteableDBObject;
 
 public class MongoUtils {
 
@@ -52,14 +48,6 @@ public class MongoUtils {
     return orQueryFilter;
   }
 
-  public static BasicDBObject deserializeFilter(byte[] filterBytes) {
-    DBObject dbo = new LazyWriteableDBObject(filterBytes,
-        new LazyBSONCallback());
-    BasicDBObject result = new BasicDBObject();
-    result.putAll(dbo);
-    return result;
-  }
-
   public static Map<String, List<BasicDBObject>> mergeFilters(
       Map<String, Object> minFilters, Map<String, Object> maxFilters) {
     Map<String, List<BasicDBObject>> filters = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index fccffb5..d453fb9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -27,10 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Maps;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -50,13 +47,13 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.mongodb.DB;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
 import com.mongodb.MongoException;
-import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoDatabase;
 
 public class MongoSchemaFactory implements SchemaFactory {
 
@@ -106,8 +103,10 @@ public class MongoSchemaFactory implements SchemaFactory {
         throw new UnsupportedOperationException();
       }
       try {
-        return MongoCnxnManager.getClient(addresses, options, credential)
-            .getDatabaseNames();
+        List<String> dbNames = new ArrayList<>();
+        MongoCnxnManager.getClient(addresses, options, credential)
+            .listDatabaseNames().into(dbNames);
+        return dbNames;
       } catch (MongoException me) {
         logger.warn("Failure while loading databases in Mongo. {}",
             me.getMessage());
@@ -124,9 +123,11 @@ public class MongoSchemaFactory implements SchemaFactory {
     @Override
     public List<String> load(String dbName) throws Exception {
       try {
-        DB db = MongoCnxnManager.getClient(addresses, options, credential)
-            .getDB(dbName);
-        return new ArrayList<>(db.getCollectionNames());
+        MongoDatabase db = MongoCnxnManager.getClient(addresses, options,
+            credential).getDatabase(dbName);
+        List<String> collectionNames = new ArrayList<>();
+        db.listCollectionNames().into(collectionNames);
+        return collectionNames;
       } catch (MongoException me) {
         logger.warn("Failure while getting collection names from '{}'. {}",
             dbName, me.getMessage());


[2/3] drill git commit: DRILL-3177: Part 2, various Mongo plugin enhancements

Posted by ja...@apache.org.
DRILL-3177: Part 2, various Mongo plugin enhancements

- Fix issues with initial Mongo 3 update patch
- Update mongo connection management so we don't generate infinite MongoClients.
- Move from static class to management inside Storage Plugin.
- Increase limit on number of connections.
- move mongo cursor initialization to batch reading, so a query won't block indefinitely on this code
- Update mongo driver version to 3.0.2
- Update host access to use all hosts rather than just master
- Remove references to no longer used UnknownHostException


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/453b363a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/453b363a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/453b363a

Branch: refs/heads/master
Commit: 453b363a7e3c97a2d57984c5e9553413c2d05c6a
Parents: 5bb75b2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Jun 25 19:56:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700

----------------------------------------------------------------------
 contrib/storage-mongo/pom.xml                   |  14 +-
 .../exec/store/mongo/MongoCnxnManager.java      |  80 ------
 .../drill/exec/store/mongo/MongoGroupScan.java  | 256 +++++++++----------
 .../exec/store/mongo/MongoRecordReader.java     |  68 +++--
 .../exec/store/mongo/MongoScanBatchCreator.java |   7 +-
 .../exec/store/mongo/MongoStoragePlugin.java    |  70 ++++-
 .../store/mongo/schema/MongoSchemaFactory.java  |  32 +--
 7 files changed, 235 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index ce4ad7e..2d7ad2d 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -34,13 +34,13 @@
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
-    
-    <dependency>
-	  <groupId>org.mongodb</groupId>
-	  <artifactId>mongo-java-driver</artifactId>
-	  <version>3.0.1</version>
-	  <scope>compile</scope>
-    </dependency>
+
+
+  <dependency>
+    <groupId>org.mongodb</groupId>
+    <artifactId>mongo-java-driver</artifactId>
+    <version>3.0.2</version>
+  </dependency>
 
     <!-- Test dependencie -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
deleted file mode 100644
index 35cc265..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo;
-
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
-import com.mongodb.ServerAddress;
-
-public class MongoCnxnManager {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(MongoCnxnManager.class);
-  private static Cache<MongoCnxnKey, MongoClient> addressClientMap;
-
-  static {
-    addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
-        .expireAfterAccess(10, TimeUnit.MINUTES)
-        .removalListener(new AddressCloser()).build();
-  }
-
-  private static class AddressCloser implements
-      RemovalListener<MongoCnxnKey, MongoClient> {
-    @Override
-    public synchronized void onRemoval(
-        RemovalNotification<MongoCnxnKey, MongoClient> removal) {
-      removal.getValue().close();
-      logger.debug("Closed connection to {}.", removal.getKey().toString());
-    }
-  }
-
-  public synchronized static MongoClient getClient(
-      List<ServerAddress> addresses, MongoClientOptions clientOptions,
-      MongoCredential credential) throws UnknownHostException {
-    // Take the first replica from the replicated servers
-    ServerAddress serverAddress = addresses.get(0);
-    String userName = credential == null ? null : credential.getUserName();
-    MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
-    MongoClient client = addressClientMap.getIfPresent(key);
-    if (client == null) {
-      if (credential != null) {
-        List<MongoCredential> credentialList = Arrays.asList(credential);
-        client = new MongoClient(addresses, credentialList, clientOptions);
-      } else {
-        client = new MongoClient(addresses, clientOptions);
-      }
-      addressClientMap.put(key, client);
-      logger.debug("Created connection to {}.", key.toString());
-      logger.debug("Number of open connections {}.", addressClientMap.size());
-    }
-    return client;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b7885d3..6bf4d92 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,10 +64,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.FindIterable;
@@ -173,157 +169,155 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @SuppressWarnings({ "rawtypes" })
   private void init() throws IOException {
-    try {
-      List<String> h = storagePluginConfig.getHosts();
-      List<ServerAddress> addresses = Lists.newArrayList();
-      for (String host : h) {
-        addresses.add(new ServerAddress(host));
-      }
-      MongoClient client = MongoCnxnManager.getClient(addresses,
-          storagePluginConfig.getMongoOptions(),
-          storagePluginConfig.getMongoCrendials());
-      chunksMapping = Maps.newHashMap();
-      chunksInverseMapping = Maps.newLinkedHashMap();
-      if (isShardedCluster(client)) {
-        MongoDatabase db = client.getDatabase(CONFIG);
-        MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
-        Document filter = new Document();
-        filter
-            .put(
-                NS,
-                this.scanSpec.getDbName() + "."
-                    + this.scanSpec.getCollectionName());
-
-        Document projection = new Document();
-        projection.put(SHARD, select);
-        projection.put(MIN, select);
-        projection.put(MAX, select);
-
-        FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
-        MongoCursor<Document> iterator = chunkCursor.iterator();
-
-        MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
-
-        projection = new Document();
-        projection.put(HOST, select);
-
-        while (iterator.hasNext()) {
-          Document chunkObj = iterator.next();
-          String shardName = (String) chunkObj.get(SHARD);
-          String chunkId = (String) chunkObj.get(ID);
-          filter = new Document(ID, shardName);
-          FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
-          MongoCursor<Document> hostIterator = hostCursor.iterator();
-          while (hostIterator.hasNext()) {
-            Document hostObj = hostIterator.next();
-            String hostEntry = (String) hostObj.get(HOST);
-            String[] tagAndHost = StringUtils.split(hostEntry, '/');
-            String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
-                tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
-            List<String> chunkHosts = Arrays.asList(hosts);
-            //to get the address list from one of the shard nodes, need to get port.
-            MongoClient shardClient = new MongoClient(hosts[0]);
-            Set<ServerAddress> addressList = getPreferredHosts(shardClient, chunkHosts);
-            if (addressList == null) {
-              addressList = Sets.newHashSet();
-              for (String host : chunkHosts) {
-                addressList.add(new ServerAddress(host));
-              }
-            }
-            chunksMapping.put(chunkId, addressList);
-            ServerAddress address = addressList.iterator().next();
-            List<ChunkInfo> chunkList = chunksInverseMapping.get(address
-                .getHost());
-            if (chunkList == null) {
-              chunkList = Lists.newArrayList();
-              chunksInverseMapping.put(address.getHost(), chunkList);
-            }
-            List<String> chunkHostsList = new ArrayList<String>();
-            for(ServerAddress serverAddr : addressList){
-              chunkHostsList.add(serverAddr.toString());
+
+    List<String> h = storagePluginConfig.getHosts();
+    List<ServerAddress> addresses = Lists.newArrayList();
+    for (String host : h) {
+      addresses.add(new ServerAddress(host));
+    }
+    MongoClient client = storagePlugin.getClient();
+    chunksMapping = Maps.newHashMap();
+    chunksInverseMapping = Maps.newLinkedHashMap();
+    if (isShardedCluster(client)) {
+      MongoDatabase db = client.getDatabase(CONFIG);
+      MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
+      Document filter = new Document();
+      filter
+          .put(
+              NS,
+              this.scanSpec.getDbName() + "."
+                  + this.scanSpec.getCollectionName());
+
+      Document projection = new Document();
+      projection.put(SHARD, select);
+      projection.put(MIN, select);
+      projection.put(MAX, select);
+
+      FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
+      MongoCursor<Document> iterator = chunkCursor.iterator();
+
+      MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
+
+      projection = new Document();
+      projection.put(HOST, select);
+
+      while (iterator.hasNext()) {
+        Document chunkObj = iterator.next();
+        String shardName = (String) chunkObj.get(SHARD);
+        String chunkId = (String) chunkObj.get(ID);
+        filter = new Document(ID, shardName);
+        FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
+        MongoCursor<Document> hostIterator = hostCursor.iterator();
+        while (hostIterator.hasNext()) {
+          Document hostObj = hostIterator.next();
+          String hostEntry = (String) hostObj.get(HOST);
+          String[] tagAndHost = StringUtils.split(hostEntry, '/');
+          String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
+              tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
+          List<String> chunkHosts = Arrays.asList(hosts);
+          Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses), chunkHosts);
+          if (addressList == null) {
+            addressList = Sets.newHashSet();
+            for (String host : chunkHosts) {
+              addressList.add(new ServerAddress(host));
             }
-            ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
-            DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
-
-            Map<String, Object> minFilters = Maps.newHashMap();
-            Map minMap = minObj.toMap();
-            Set keySet = minMap.keySet();
-            for (Object keyObj : keySet) {
-              Object object = minMap.get(keyObj);
-              if (!(object instanceof MinKey)) {
-                minFilters.put(keyObj.toString(), object);
-              }
+          }
+          chunksMapping.put(chunkId, addressList);
+          ServerAddress address = addressList.iterator().next();
+          List<ChunkInfo> chunkList = chunksInverseMapping.get(address
+              .getHost());
+          if (chunkList == null) {
+            chunkList = Lists.newArrayList();
+            chunksInverseMapping.put(address.getHost(), chunkList);
+          }
+          List<String> chunkHostsList = new ArrayList<String>();
+          for (ServerAddress serverAddr : addressList) {
+            chunkHostsList.add(serverAddr.toString());
+          }
+          ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
+          Document minMap = (Document) chunkObj.get(MIN);
+
+          Map<String, Object> minFilters = Maps.newHashMap();
+          Set keySet = minMap.keySet();
+          for (Object keyObj : keySet) {
+            Object object = minMap.get(keyObj);
+            if (!(object instanceof MinKey)) {
+              minFilters.put(keyObj.toString(), object);
             }
-            chunkInfo.setMinFilters(minFilters);
-
-            DBObject maxObj = (BasicDBObject) chunkObj.get(MAX);
-            Map<String, Object> maxFilters = Maps.newHashMap();
-            Map maxMap = maxObj.toMap();
-            keySet = maxMap.keySet();
-            for (Object keyObj : keySet) {
-              Object object = maxMap.get(keyObj);
-              if (!(object instanceof MaxKey)) {
-                maxFilters.put(keyObj.toString(), object);
-              }
+          }
+          chunkInfo.setMinFilters(minFilters);
+
+          Map<String, Object> maxFilters = Maps.newHashMap();
+          Map maxMap = (Document) chunkObj.get(MAX);
+          keySet = maxMap.keySet();
+          for (Object keyObj : keySet) {
+            Object object = maxMap.get(keyObj);
+            if (!(object instanceof MaxKey)) {
+              maxFilters.put(keyObj.toString(), object);
             }
-
-            chunkInfo.setMaxFilters(maxFilters);
-            chunkList.add(chunkInfo);
           }
+
+          chunkInfo.setMaxFilters(maxFilters);
+          chunkList.add(chunkInfo);
         }
-      } else {
-        String chunkName = scanSpec.getDbName() + "."
-            + scanSpec.getCollectionName();
-        List<String> hosts = storagePluginConfig.getHosts();
-        Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
-        if (addressList == null) {
-          addressList = Sets.newHashSet();
-          for (String host : hosts) {
-            addressList.add(new ServerAddress(host));
-          }
+      }
+    } else {
+      String chunkName = scanSpec.getDbName() + "."
+          + scanSpec.getCollectionName();
+      List<String> hosts = storagePluginConfig.getHosts();
+      Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
+      if (addressList == null) {
+        addressList = Sets.newHashSet();
+        for (String host : hosts) {
+          addressList.add(new ServerAddress(host));
         }
-        chunksMapping.put(chunkName, addressList);
-
-        String host = hosts.get(0);
-        ServerAddress address = new ServerAddress(host);
-        ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
-        chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
-        chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
-        List<ChunkInfo> chunksList = Lists.newArrayList();
-        chunksList.add(chunkInfo);
-        chunksInverseMapping.put(address.getHost(), chunksList);
       }
-    } catch (UnknownHostException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+      chunksMapping.put(chunkName, addressList);
+
+      String host = hosts.get(0);
+      ServerAddress address = new ServerAddress(host);
+      ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
+      chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
+      chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+      List<ChunkInfo> chunksList = Lists.newArrayList();
+      chunksList.add(chunkInfo);
+      chunksInverseMapping.put(address.getHost(), chunksList);
     }
 
   }
 
   @SuppressWarnings("unchecked")
-  private Set<ServerAddress> getPreferredHosts(MongoClient client,
-      List<String> hosts) throws UnknownHostException {
+  private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) {
     Set<ServerAddress> addressList = Sets.newHashSet();
     MongoDatabase db = client.getDatabase(scanSpec.getDbName());
     ReadPreference readPreference = client.getReadPreference();
     Document command = db.runCommand(new Document("isMaster", 1));
 
+    final String primaryHost = command.getString("primary");
+    final List<String> hostsList = (List<String>) command.get("hosts");
+
     switch (readPreference.getName().toUpperCase()) {
     case "PRIMARY":
     case "PRIMARYPREFERRED":
-      String primaryHost = command.getString("primary");
+      if (primaryHost == null) {
+        return null;
+      }
       addressList.add(new ServerAddress(primaryHost));
       return addressList;
     case "SECONDARY":
     case "SECONDARYPREFERRED":
-      primaryHost = command.getString("primary");
-      List<String> hostsList = (List<String>) command.get("hosts");
+      if (primaryHost == null || hostsList == null) {
+        return null;
+      }
       hostsList.remove(primaryHost);
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
       return addressList;
     case "NEAREST":
-      hostsList = (List<String>) command.get("hosts");
+      if (hostsList == null) {
+        return null;
+      }
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
@@ -468,16 +462,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public ScanStats getScanStats() {
-    MongoClientURI clientURI = new MongoClientURI(
-        this.storagePluginConfig.getConnection());
-    try {
-      List<String> hosts = clientURI.getHosts();
-      List<ServerAddress> addresses = Lists.newArrayList();
-      for (String host : hosts) {
-        addresses.add(new ServerAddress(host));
-      }
-      MongoClient client = MongoCnxnManager.getClient(addresses,
-          clientURI.getOptions(), clientURI.getCredentials());
+    try{
+      MongoClient client = storagePlugin.getClient();
       MongoDatabase db = client.getDatabase(scanSpec.getDbName());
       MongoCollection<Document> collection = db.getCollection(scanSpec
           .getCollectionName());

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 40fc810..0ac519f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -47,8 +46,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -64,27 +61,29 @@ public class MongoRecordReader extends AbstractRecordReader {
   private VectorContainerWriter writer;
 
   private BasicDBObject filters;
-  private BasicDBObject fields;
+  private final BasicDBObject fields;
 
-  private MongoClientOptions clientOptions;
-  private MongoCredential credential;
-  private FragmentContext fragmentContext;
+  private final FragmentContext fragmentContext;
   private OperatorContext operatorContext;
 
-  private Boolean enableAllTextMode;
-  private Boolean readNumbersAsDouble;
+  private final MongoStoragePlugin plugin;
 
-  public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
-      List<SchemaPath> projectedColumns, FragmentContext context,
-      MongoClientOptions clientOptions, MongoCredential credential) {
-    this.clientOptions = clientOptions;
-    this.credential = credential;
-    this.fields = new BasicDBObject();
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  public MongoRecordReader(
+      MongoSubScan.MongoSubScanSpec subScanSpec,
+      List<SchemaPath> projectedColumns,
+      FragmentContext context,
+      MongoStoragePlugin plugin) {
+
+    fields = new BasicDBObject();
     // exclude _id field, if not mentioned by user.
-    this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+    fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
     setColumns(projectedColumns);
-    this.fragmentContext = context;
-    this.filters = new BasicDBObject();
+    fragmentContext = context;
+    this.plugin = plugin;
+    filters = new BasicDBObject();
     Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
         subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
     buildFilters(subScanSpec.getFilter(), mergedFilters);
@@ -94,8 +93,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(
-      Collection<SchemaPath> projectedColumns) {
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
     if (!isStarQuery()) {
       for (SchemaPath column : projectedColumns ) {
@@ -132,19 +130,14 @@ public class MongoRecordReader extends AbstractRecordReader {
   }
 
   private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
-    try {
-      List<String> hosts = subScanSpec.getHosts();
-      List<ServerAddress> addresses = Lists.newArrayList();
-      for (String host : hosts) {
-        addresses.add(new ServerAddress(host));
-      }
-      MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
-          credential);
-      MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
-      collection = db.getCollection(subScanSpec.getCollectionName());
-    } catch (UnknownHostException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+    List<String> hosts = subScanSpec.getHosts();
+    List<ServerAddress> addresses = Lists.newArrayList();
+    for (String host : hosts) {
+      addresses.add(new ServerAddress(host));
     }
+    MongoClient client = plugin.getClient(addresses);
+    MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
+    collection = db.getCollection(subScanSpec.getCollectionName());
   }
 
   @Override
@@ -152,13 +145,18 @@ public class MongoRecordReader extends AbstractRecordReader {
     this.operatorContext = context;
     this.writer = new VectorContainerWriter(output);
     this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
-    logger.info("Filters Applied : " + filters);
-    logger.info("Fields Selected :" + fields);
-    cursor = collection.find(filters).projection(fields).iterator();
+
   }
 
   @Override
   public int next() {
+    if(cursor == null){
+      logger.info("Filters Applied : " + filters);
+      logger.info("Fields Selected :" + fields);
+      cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+    }
+
+
     writer.allocate();
     writer.reset();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 3a8a496..49b1750 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -51,12 +51,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
         if ((columns = subScan.getColumns()) == null) {
           columns = GroupScan.ALL_COLUMNS;
         }
-        MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
-            .getMongoOptions();
-        MongoCredential mongoCrendials = subScan.getMongoPluginConfig()
-            .getMongoCrendials();
-        readers.add(new MongoRecordReader(scanSpec, columns, context,
-            clientOptions, mongoCrendials));
+        readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
       } catch (Exception e) {
         logger.error("MongoRecordReader creation failed for subScan:  "
             + subScan + ".");

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 38bc91d..093df57 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -18,10 +18,13 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.schema.SchemaPlus;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -35,21 +38,36 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
 
 public class MongoStoragePlugin extends AbstractStoragePlugin {
   static final Logger logger = LoggerFactory
       .getLogger(MongoStoragePlugin.class);
 
-  private DrillbitContext context;
-  private MongoStoragePluginConfig mongoConfig;
-  private MongoSchemaFactory schemaFactory;
+  private final DrillbitContext context;
+  private final MongoStoragePluginConfig mongoConfig;
+  private final MongoSchemaFactory schemaFactory;
+  private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
+  private final MongoClientURI clientURI;
 
   public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig,
       DrillbitContext context, String name) throws IOException,
       ExecutionSetupException {
     this.context = context;
     this.mongoConfig = mongoConfig;
+    this.clientURI = new MongoClientURI(this.mongoConfig.getConnection());
+    this.addressClientMap = CacheBuilder.newBuilder()
+        .expireAfterAccess(24, TimeUnit.HOURS)
+        .removalListener(new AddressCloser()).build();
     this.schemaFactory = new MongoSchemaFactory(this, name);
   }
 
@@ -82,4 +100,48 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
   }
 
+
+  private class AddressCloser implements
+      RemovalListener<MongoCnxnKey, MongoClient> {
+    @Override
+    public synchronized void onRemoval(
+        RemovalNotification<MongoCnxnKey, MongoClient> removal) {
+      removal.getValue().close();
+      logger.debug("Closed connection to {}.", removal.getKey().toString());
+    }
+  }
+
+  public MongoClient getClient(String host) {
+    return getClient(Collections.singletonList(new ServerAddress(host)));
+  }
+
+  public MongoClient getClient() {
+    List<String> hosts = clientURI.getHosts();
+    List<ServerAddress> addresses = Lists.newArrayList();
+    for (String host : hosts) {
+      addresses.add(new ServerAddress(host));
+    }
+    return getClient(addresses);
+  }
+
+  public synchronized MongoClient getClient(List<ServerAddress> addresses) {
+    // Take the first replica from the replicated servers
+    final ServerAddress serverAddress = addresses.get(0);
+    final MongoCredential credential = clientURI.getCredentials();
+    String userName = credential == null ? null : credential.getUserName();
+    MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName);
+    MongoClient client = addressClientMap.getIfPresent(key);
+    if (client == null) {
+      if (credential != null) {
+        List<MongoCredential> credentialList = Arrays.asList(credential);
+        client = new MongoClient(addresses, credentialList, clientURI.getOptions());
+      } else {
+        client = new MongoClient(addresses, clientURI.getOptions());
+      }
+      addressClientMap.put(key, client);
+      logger.debug("Created connection to {}.", key.toString());
+      logger.debug("Number of open connections {}.", addressClientMap.size());
+    }
+    return client;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index d453fb9..9d08abd 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.mongo.schema;
 
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.mongo.MongoCnxnManager;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
 import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
@@ -46,13 +44,10 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
+import com.mongodb.MongoClient;
 import com.mongodb.MongoException;
-import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoDatabase;
 
 public class MongoSchemaFactory implements SchemaFactory {
@@ -66,23 +61,12 @@ public class MongoSchemaFactory implements SchemaFactory {
   private LoadingCache<String, List<String>> tableNameLoader;
   private final String schemaName;
   private final MongoStoragePlugin plugin;
+  private final MongoClient client;
 
-  private final List<ServerAddress> addresses;
-  private final MongoClientOptions options;
-  private final MongoCredential credential;
-
-  public MongoSchemaFactory(MongoStoragePlugin schema, String schemaName)
-      throws ExecutionSetupException, UnknownHostException {
-    this.plugin = schema;
+  public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+    this.plugin = plugin;
     this.schemaName = schemaName;
-
-    List<String> hosts = plugin.getConfig().getHosts();
-    addresses = Lists.newArrayList();
-    for (String host : hosts) {
-      addresses.add(new ServerAddress(host));
-    }
-    options = plugin.getConfig().getMongoOptions();
-    credential = plugin.getConfig().getMongoCrendials();
+    this.client = plugin.getClient();
 
     databases = CacheBuilder //
         .newBuilder() //
@@ -104,8 +88,7 @@ public class MongoSchemaFactory implements SchemaFactory {
       }
       try {
         List<String> dbNames = new ArrayList<>();
-        MongoCnxnManager.getClient(addresses, options, credential)
-            .listDatabaseNames().into(dbNames);
+        client.listDatabaseNames().into(dbNames);
         return dbNames;
       } catch (MongoException me) {
         logger.warn("Failure while loading databases in Mongo. {}",
@@ -123,8 +106,7 @@ public class MongoSchemaFactory implements SchemaFactory {
     @Override
     public List<String> load(String dbName) throws Exception {
       try {
-        MongoDatabase db = MongoCnxnManager.getClient(addresses, options,
-            credential).getDatabase(dbName);
+        MongoDatabase db = client.getDatabase(dbName);
         List<String> collectionNames = new ArrayList<>();
         db.listCollectionNames().into(collectionNames);
         return collectionNames;