You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/06/26 05:31:04 UTC

[3/3] drill git commit: DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1

DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5bb75b2d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5bb75b2d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5bb75b2d

Branch: refs/heads/master
Commit: 5bb75b2d1fc23586aff40ebfd93a7ce2084d01ad
Parents: c7c2236
Author: Kamesh <ka...@gmail.com>
Authored: Sat May 23 07:38:26 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 25 19:58:47 2015 -0700

----------------------------------------------------------------------
 contrib/storage-mongo/pom.xml                   |  2 +-
 .../drill/exec/store/mongo/MongoGroupScan.java  | 76 ++++++++++----------
 .../exec/store/mongo/MongoRecordReader.java     | 21 +++---
 .../drill/exec/store/mongo/MongoUtils.java      | 12 ----
 .../store/mongo/schema/MongoSchemaFactory.java  | 21 +++---
 5 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 8168861..ce4ad7e 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
 	  <groupId>org.mongodb</groupId>
 	  <artifactId>mongo-java-driver</artifactId>
-	  <version>2.12.2</version>
+	  <version>3.0.1</version>
 	  <scope>compile</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index e33d2ae..b7885d3 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
 import org.apache.drill.exec.store.mongo.common.ChunkInfo;
+import org.bson.Document;
 import org.bson.types.MaxKey;
 import org.bson.types.MinKey;
 import org.slf4j.Logger;
@@ -65,15 +66,15 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
-import com.mongodb.CommandResult;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
 
 @JsonTypeName("mongo-scan")
 public class MongoGroupScan extends AbstractGroupScan implements
@@ -165,12 +166,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   private boolean isShardedCluster(MongoClient client) {
-    DB db = client.getDB(scanSpec.getDbName());
-    String msg = db.command("isMaster").getString("msg");
+    MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+    String msg = db.runCommand(new Document("isMaster", 1)).getString("msg");
     return msg == null ? false : msg.equals("isdbgrid");
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes" })
   private void init() throws IOException {
     try {
       List<String> h = storagePluginConfig.getHosts();
@@ -184,35 +185,37 @@ public class MongoGroupScan extends AbstractGroupScan implements
       chunksMapping = Maps.newHashMap();
       chunksInverseMapping = Maps.newLinkedHashMap();
       if (isShardedCluster(client)) {
-        DB db = client.getDB(CONFIG);
-        DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
-        DBObject query = new BasicDBObject(1);
-        query
+        MongoDatabase db = client.getDatabase(CONFIG);
+        MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
+        Document filter = new Document();
+        filter
             .put(
                 NS,
                 this.scanSpec.getDbName() + "."
                     + this.scanSpec.getCollectionName());
 
-        DBObject fields = new BasicDBObject();
-        fields.put(SHARD, select);
-        fields.put(MIN, select);
-        fields.put(MAX, select);
+        Document projection = new Document();
+        projection.put(SHARD, select);
+        projection.put(MIN, select);
+        projection.put(MAX, select);
 
-        DBCursor chunkCursor = chunksCollection.find(query, fields);
+        FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
+        MongoCursor<Document> iterator = chunkCursor.iterator();
 
-        DBCollection shardsCollection = db.getCollectionFromString(SHARDS);
+        MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
 
-        fields = new BasicDBObject();
-        fields.put(HOST, select);
+        projection = new Document();
+        projection.put(HOST, select);
 
-        while (chunkCursor.hasNext()) {
-          DBObject chunkObj = chunkCursor.next();
+        while (iterator.hasNext()) {
+          Document chunkObj = iterator.next();
           String shardName = (String) chunkObj.get(SHARD);
           String chunkId = (String) chunkObj.get(ID);
-          query = new BasicDBObject().append(ID, shardName);
-          DBCursor hostCursor = shardsCollection.find(query, fields);
-          while (hostCursor.hasNext()) {
-            DBObject hostObj = hostCursor.next();
+          filter = new Document(ID, shardName);
+          FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection);
+          MongoCursor<Document> hostIterator = hostCursor.iterator();
+          while (hostIterator.hasNext()) {
+            Document hostObj = hostIterator.next();
             String hostEntry = (String) hostObj.get(HOST);
             String[] tagAndHost = StringUtils.split(hostEntry, '/');
             String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
@@ -300,27 +303,27 @@ public class MongoGroupScan extends AbstractGroupScan implements
   private Set<ServerAddress> getPreferredHosts(MongoClient client,
       List<String> hosts) throws UnknownHostException {
     Set<ServerAddress> addressList = Sets.newHashSet();
-    DB db = client.getDB(scanSpec.getDbName());
+    MongoDatabase db = client.getDatabase(scanSpec.getDbName());
     ReadPreference readPreference = client.getReadPreference();
+    Document command = db.runCommand(new Document("isMaster", 1));
+
     switch (readPreference.getName().toUpperCase()) {
     case "PRIMARY":
     case "PRIMARYPREFERRED":
-      String primaryHost = db.command("isMaster").getString("primary");
+      String primaryHost = command.getString("primary");
       addressList.add(new ServerAddress(primaryHost));
       return addressList;
     case "SECONDARY":
     case "SECONDARYPREFERRED":
-      primaryHost = db.command("isMaster").getString("primary");
-      @SuppressWarnings("unchecked")
-      List<String> hostsList = (List<String>) db.command("isMaster").get(
-          "hosts");
+      primaryHost = command.getString("primary");
+      List<String> hostsList = (List<String>) command.get("hosts");
       hostsList.remove(primaryHost);
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
       return addressList;
     case "NEAREST":
-      hostsList = (List<String>) db.command("isMaster").get("hosts");
+      hostsList = (List<String>) command.get("hosts");
       for (String host : hostsList) {
         addressList.add(new ServerAddress(host));
       }
@@ -475,12 +478,13 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
       MongoClient client = MongoCnxnManager.getClient(addresses,
           clientURI.getOptions(), clientURI.getCredentials());
-      DB db = client.getDB(scanSpec.getDbName());
-      DBCollection collection = db.getCollectionFromString(scanSpec
+      MongoDatabase db = client.getDatabase(scanSpec.getDbName());
+      MongoCollection<Document> collection = db.getCollection(scanSpec
           .getCollectionName());
-      CommandResult stats = collection.getStats();
+      String json = collection.find().first().toJson();
+      float approxDiskCost = json.getBytes().length * collection.count();
       return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
-          stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE));
+          collection.count(), 1, approxDiskCost);
     } catch (Exception e) {
       throw new DrillRuntimeException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 182f5a4..40fc810 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,27 +46,25 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
-import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
 
 public class MongoRecordReader extends AbstractRecordReader {
   static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
 
-  private DBCollection collection;
-  private DBCursor cursor;
+  private MongoCollection<Document> collection;
+  private MongoCursor<Document> cursor;
 
   private JsonReader jsonReader;
   private VectorContainerWriter writer;
 
   private BasicDBObject filters;
-  private DBObject fields;
+  private BasicDBObject fields;
 
   private MongoClientOptions clientOptions;
   private MongoCredential credential;
@@ -141,7 +140,7 @@ public class MongoRecordReader extends AbstractRecordReader {
       }
       MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions,
           credential);
-      DB db = client.getDB(subScanSpec.getDbName());
+      MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
       collection = db.getCollection(subScanSpec.getCollectionName());
     } catch (UnknownHostException e) {
       throw new DrillRuntimeException(e.getMessage(), e);
@@ -155,7 +154,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
     logger.info("Filters Applied : " + filters);
     logger.info("Fields Selected :" + fields);
-    cursor = collection.find(filters, fields);
+    cursor = collection.find(filters).projection(fields).iterator();
   }
 
   @Override
@@ -170,7 +169,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     try {
       while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext()) {
         writer.setPosition(docCount);
-        String doc = cursor.next().toString();
+        String doc = cursor.next().toJson();
         jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
         jsonReader.write(writer);
         docCount++;

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
index b43a22f..a5fb2ad 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
@@ -22,13 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.bson.LazyBSONCallback;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
-import com.mongodb.LazyWriteableDBObject;
 
 public class MongoUtils {
 
@@ -52,14 +48,6 @@ public class MongoUtils {
     return orQueryFilter;
   }
 
-  public static BasicDBObject deserializeFilter(byte[] filterBytes) {
-    DBObject dbo = new LazyWriteableDBObject(filterBytes,
-        new LazyBSONCallback());
-    BasicDBObject result = new BasicDBObject();
-    result.putAll(dbo);
-    return result;
-  }
-
   public static Map<String, List<BasicDBObject>> mergeFilters(
       Map<String, Object> minFilters, Map<String, Object> maxFilters) {
     Map<String, List<BasicDBObject>> filters = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index fccffb5..d453fb9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -27,10 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Maps;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -50,13 +47,13 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.mongodb.DB;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
 import com.mongodb.MongoException;
-import com.mongodb.ReadPreference;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoDatabase;
 
 public class MongoSchemaFactory implements SchemaFactory {
 
@@ -106,8 +103,10 @@ public class MongoSchemaFactory implements SchemaFactory {
         throw new UnsupportedOperationException();
       }
       try {
-        return MongoCnxnManager.getClient(addresses, options, credential)
-            .getDatabaseNames();
+        List<String> dbNames = new ArrayList<>();
+        MongoCnxnManager.getClient(addresses, options, credential)
+            .listDatabaseNames().into(dbNames);
+        return dbNames;
       } catch (MongoException me) {
         logger.warn("Failure while loading databases in Mongo. {}",
             me.getMessage());
@@ -124,9 +123,11 @@ public class MongoSchemaFactory implements SchemaFactory {
     @Override
     public List<String> load(String dbName) throws Exception {
       try {
-        DB db = MongoCnxnManager.getClient(addresses, options, credential)
-            .getDB(dbName);
-        return new ArrayList<>(db.getCollectionNames());
+        MongoDatabase db = MongoCnxnManager.getClient(addresses, options,
+            credential).getDatabase(dbName);
+        List<String> collectionNames = new ArrayList<>();
+        db.listCollectionNames().into(collectionNames);
+        return collectionNames;
       } catch (MongoException me) {
         logger.warn("Failure while getting collection names from '{}'. {}",
             dbName, me.getMessage());