You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2014/09/30 09:26:06 UTC

[04/21] git commit: DRILL-98: MongoDB storage plugin

DRILL-98: MongoDB storage plugin

This commit disables MongoDB PStore due to changes to the PStore interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2ca9c907
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2ca9c907
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2ca9c907

Branch: refs/heads/master
Commit: 2ca9c907bff639e08a561eac32e0acab3a0b3304
Parents: 786fd36
Author: Kamesh <ka...@gmail.com>
Authored: Sat Sep 27 18:35:28 2014 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Sun Sep 28 00:10:44 2014 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   1 +
 contrib/storage-mongo/pom.xml                   |  85 +++
 .../exec/store/mongo/DrillMongoConstants.java   |  45 ++
 .../exec/store/mongo/MongoCnxnManager.java      |  73 +++
 .../mongo/MongoCompareFunctionProcessor.java    | 255 +++++++++
 .../exec/store/mongo/MongoFilterBuilder.java    | 219 ++++++++
 .../drill/exec/store/mongo/MongoGroupScan.java  | 538 +++++++++++++++++++
 .../store/mongo/MongoPushDownFilterForScan.java | 103 ++++
 .../exec/store/mongo/MongoRecordReader.java     | 302 +++++++++++
 .../exec/store/mongo/MongoScanBatchCreator.java |  68 +++
 .../drill/exec/store/mongo/MongoScanSpec.java   |  62 +++
 .../exec/store/mongo/MongoStoragePlugin.java    |  88 +++
 .../store/mongo/MongoStoragePluginConfig.java   |  72 +++
 .../drill/exec/store/mongo/MongoSubScan.java    | 214 ++++++++
 .../drill/exec/store/mongo/MongoUtils.java      |  89 +++
 .../exec/store/mongo/common/ChunkInfo.java      |  68 +++
 .../exec/store/mongo/common/MongoCompareOp.java |  34 ++
 .../exec/store/mongo/config/MongoPStore.java    | 190 +++++++
 .../store/mongo/config/MongoPStoreProvider.java |  76 +++
 .../store/mongo/schema/MongoDatabaseSchema.java |  59 ++
 .../store/mongo/schema/MongoSchemaFactory.java  | 189 +++++++
 .../resources/bootstrap-storage-plugins.json    |   9 +
 .../src/main/resources/drill-module.conf        |  28 +
 .../store/mongo/TestMongoChunkAssignment.java   | 274 ++++++++++
 distribution/pom.xml                            |   5 +
 distribution/src/assemble/bin.xml               |   1 +
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/vector/complex/fn/JsonReader.java      |  16 +-
 .../vector/complex/fn/JsonReaderWithState.java  |  22 +
 30 files changed, 3186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 728038a..cbb5598 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -34,6 +34,7 @@
   <modules>
     <module>storage-hbase</module>
     <module>storage-hive</module>
+    <module>storage-mongo</module>
     <module>sqlline</module>
     <module>data</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
new file mode 100644
index 0000000..720bf1e
--- /dev/null
+++ b/contrib/storage-mongo/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>0.6.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-mongo-storage</artifactId>
+
+  <name>contrib/mongo-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+	  <groupId>org.mongodb</groupId>
+	  <artifactId>mongo-java-driver</artifactId>
+	  <version>2.12.2</version>
+	  <scope>compile</scope>
+    </dependency>
+
+    <!-- Test dependencie -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>2.1.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>logback.log.dir</name>
+              <value>${project.build.directory}/surefire-reports</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
new file mode 100644
index 0000000..8261e2e
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+public interface DrillMongoConstants {
+
+  public static final String SYS_STORE_PROVIDER_MONGO_URL = "drill.exec.sys.store.provider.mongo.url";
+
+  public static final String ID = "_id";
+
+  public static final String SHARDS = "shards";
+
+  public static final String NS = "ns";
+
+  public static final String SHARD = "shard";
+
+  public static final String HOST = "host";
+
+  public static final String CHUNKS = "chunks";
+
+  public static final String SIZE = "size";
+
+  public static final String COUNT = "count";
+
+  public static final String CONFIG = "config";
+
+  public static final String MIN = "min";
+
+  public static final String MAX = "max";
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
new file mode 100644
index 0000000..a4482dd
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.ServerAddress;
+
+public class MongoCnxnManager {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(MongoCnxnManager.class);
+  private static Cache<ServerAddress, MongoClient> addressClientMap;
+
+  static {
+    addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
+        .expireAfterAccess(10, TimeUnit.MINUTES)
+        .removalListener(new AddressCloser()).build();
+  }
+
+  private static class AddressCloser implements
+      RemovalListener<ServerAddress, MongoClient> {
+    @Override
+    public synchronized void onRemoval(
+        RemovalNotification<ServerAddress, MongoClient> removal) {
+      removal.getValue().close();
+      ;
+      logger.debug("Closed connection to {}.", removal.getKey().toString());
+    }
+  }
+
+  public synchronized static MongoClient getClient(
+      List<ServerAddress> addresses, MongoClientOptions clientOptions)
+      throws UnknownHostException {
+    // Take the first replica from the replicated servers
+    ServerAddress serverAddress = addresses.get(0);
+    MongoClient client = addressClientMap.getIfPresent(serverAddress);
+    if (client == null) {
+      client = new MongoClient(addresses, clientOptions);
+      addressClientMap.put(serverAddress, client);
+      logger.debug("Created connection to {}.", serverAddress.toString());
+      logger.debug("Number of connections opened are {}.",
+          addressClientMap.size());
+    }
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
new file mode 100644
index 0000000..d588f1e
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class MongoCompareFunctionProcessor extends
+    AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+  private Object value;
+  private boolean success;
+  private boolean isEqualityFn;
+  private SchemaPath path;
+  private String functionName;
+
+  public static boolean isCompareFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  public static MongoCompareFunctionProcessor process(FunctionCall call) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args.get(0);
+    LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1)
+        : null;
+    MongoCompareFunctionProcessor evaluator = new MongoCompareFunctionProcessor(
+        functionName);
+
+    if (valueArg != null) { // binary function
+      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+        LogicalExpression swapArg = valueArg;
+        valueArg = nameArg;
+        nameArg = swapArg;
+        evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+            .get(functionName);
+      }
+      evaluator.success = nameArg.accept(evaluator, valueArg);
+    } else if (call.args.get(0) instanceof SchemaPath) {
+      evaluator.success = true;
+      evaluator.path = (SchemaPath) nameArg;
+    }
+
+    return evaluator;
+  }
+
+  public MongoCompareFunctionProcessor(String functionName) {
+    this.success = false;
+    this.functionName = functionName;
+    this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+        .containsKey(functionName)
+        && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(
+            functionName);
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public SchemaPath getPath() {
+    return path;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  @Override
+  public Boolean visitCastExpression(CastExpression e,
+      LogicalExpression valueArg) throws RuntimeException {
+    if (e.getInput() instanceof CastExpression
+        || e.getInput() instanceof SchemaPath) {
+      return e.getInput().accept(this, valueArg);
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitConvertExpression(ConvertExpression e,
+      LogicalExpression valueArg) throws RuntimeException {
+    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM
+        && e.getInput() instanceof SchemaPath) {
+      String encodingType = e.getEncodingType();
+      switch (encodingType) {
+      case "INT_BE":
+      case "INT":
+      case "UINT_BE":
+      case "UINT":
+      case "UINT4_BE":
+      case "UINT4":
+        if (valueArg instanceof IntExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+          this.value = ((IntExpression) valueArg).getInt();
+        }
+        break;
+      case "BIGINT_BE":
+      case "BIGINT":
+      case "UINT8_BE":
+      case "UINT8":
+        if (valueArg instanceof LongExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+          this.value = ((LongExpression) valueArg).getLong();
+        }
+        break;
+      case "FLOAT":
+        if (valueArg instanceof FloatExpression && isEqualityFn) {
+          this.value = ((FloatExpression) valueArg).getFloat();
+        }
+        break;
+      case "DOUBLE":
+        if (valueArg instanceof DoubleExpression && isEqualityFn) {
+          this.value = ((DoubleExpression) valueArg).getDouble();
+        }
+        break;
+      case "TIME_EPOCH":
+      case "TIME_EPOCH_BE":
+        if (valueArg instanceof TimeExpression) {
+          this.value = ((TimeExpression) valueArg).getTime();
+        }
+        break;
+      case "DATE_EPOCH":
+      case "DATE_EPOCH_BE":
+        if (valueArg instanceof DateExpression) {
+          this.value = ((DateExpression) valueArg).getDate();
+        }
+        break;
+      case "BOOLEAN_BYTE":
+        if (valueArg instanceof BooleanExpression) {
+          this.value = ((BooleanExpression) valueArg).getBoolean();
+        }
+        break;
+      case "UTF8":
+        // let visitSchemaPath() handle this.
+        return e.getInput().accept(this, valueArg);
+      }
+
+      if (value != null) {
+        this.path = (SchemaPath) e.getInput();
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg)
+      throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg)
+      throws RuntimeException {
+    if (valueArg instanceof QuotedString) {
+      this.value = ((QuotedString) valueArg).value;
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof IntExpression) {
+      this.value = ((IntExpression) valueArg).getInt();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof LongExpression) {
+      this.value = ((LongExpression) valueArg).getLong();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof FloatExpression) {
+      this.value = ((FloatExpression) valueArg).getFloat();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof DoubleExpression) {
+      this.value = ((DoubleExpression) valueArg).getDouble();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof BooleanExpression) {
+      this.value = ((BooleanExpression) valueArg).getBoolean();
+      this.path = path;
+      return true;
+    }
+
+    return false;
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet
+        .builder();
+    VALUE_EXPRESSION_CLASSES = builder.add(BooleanExpression.class)
+        .add(DateExpression.class).add(DoubleExpression.class)
+        .add(FloatExpression.class).add(IntExpression.class)
+        .add(LongExpression.class).add(QuotedString.class)
+        .add(TimeExpression.class).build();
+  }
+
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+        // unary functions
+        .put("isnotnull", "isnotnull")
+        .put("isNotNull", "isNotNull")
+        .put("is not null", "is not null")
+        .put("isnull", "isnull")
+        .put("isNull", "isNull")
+        .put("is null", "is null")
+        // binary functions
+        .put("equal", "equal").put("not_equal", "not_equal")
+        .put("greater_than_or_equal_to", "less_than_or_equal_to")
+        .put("greater_than", "less_than")
+        .put("less_than_or_equal_to", "greater_than_or_equal_to")
+        .put("less_than", "greater_than").build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
new file mode 100644
index 0000000..def793a
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.mongodb.BasicDBObject;
+
+public class MongoFilterBuilder extends
+    AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+    DrillMongoConstants {
+  static final Logger logger = LoggerFactory
+      .getLogger(MongoFilterBuilder.class);
+  final MongoGroupScan groupScan;
+  final LogicalExpression le;
+  private boolean allExpressionsConverted = true;
+
+  public MongoFilterBuilder(MongoGroupScan groupScan,
+      LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    this.le = conditionExp;
+  }
+
+  public MongoScanSpec parseTree() {
+    MongoScanSpec parsedSpec = le.accept(this, null);
+    if (parsedSpec != null) {
+      parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getScanSpec(),
+          parsedSpec);
+    }
+    return parsedSpec;
+  }
+
+  private MongoScanSpec mergeScanSpecs(String functionName,
+      MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+    BasicDBObject newFilter = null;
+
+    switch (functionName) {
+    case "booleanAnd":
+      if (leftScanSpec.getFilters() != null
+          && rightScanSpec.getFilters() != null) {
+        newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
+            rightScanSpec.getFilters());
+      } else if (leftScanSpec.getFilters() != null) {
+        newFilter = leftScanSpec.getFilters();
+      } else {
+        newFilter = rightScanSpec.getFilters();
+      }
+      break;
+    case "booleanOr":
+      newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
+          rightScanSpec.getFilters());
+    }
+    return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
+        .getScanSpec().getCollectionName(), newFilter);
+  }
+
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
+  }
+
+  @Override
+  public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+      throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
+  }
+
+  @Override
+  public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+    List<LogicalExpression> args = op.args;
+    MongoScanSpec nodeScanSpec = null;
+    String functionName = op.getName();
+    for (int i = 0; i < args.size(); ++i) {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        if (nodeScanSpec == null) {
+          nodeScanSpec = args.get(i).accept(this, null);
+        } else {
+          MongoScanSpec scanSpec = args.get(i).accept(this, null);
+          if (scanSpec != null) {
+            nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+          } else {
+            allExpressionsConverted = false;
+          }
+        }
+        break;
+      }
+    }
+    return nodeScanSpec;
+  }
+
+  @Override
+  public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+      throws RuntimeException {
+    MongoScanSpec nodeScanSpec = null;
+    String functionName = call.getName();
+    ImmutableList<LogicalExpression> args = call.args;
+
+    if (MongoCompareFunctionProcessor.isCompareFunction(functionName)) {
+      MongoCompareFunctionProcessor processor = MongoCompareFunctionProcessor
+          .process(call);
+      if (processor.isSuccess()) {
+        try {
+          nodeScanSpec = createMongoScanSpec(processor.getFunctionName(),
+              processor.getPath(), processor.getValue());
+        } catch (Exception e) {
+          logger.error(" Failed to creare Filter ", e);
+          // throw new RuntimeException(e.getMessage(), e);
+        }
+      }
+    } else {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
+        MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
+        if (leftScanSpec != null && rightScanSpec != null) {
+          nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
+              rightScanSpec);
+        } else {
+          allExpressionsConverted = false;
+          if ("booleanAnd".equals(functionName)) {
+            nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+          }
+        }
+        break;
+      }
+    }
+
+    if (nodeScanSpec == null) {
+      allExpressionsConverted = false;
+    }
+
+    return nodeScanSpec;
+  }
+
+  private MongoScanSpec createMongoScanSpec(String functionName,
+      SchemaPath field, Object fieldValue) throws ClassNotFoundException,
+      IOException {
+    // extract the field name
+    String fieldName = field.getAsUnescapedPath();
+    MongoCompareOp compareOp = null;
+    switch (functionName) {
+    case "equal":
+      compareOp = MongoCompareOp.EQUAL;
+      break;
+    case "not_equal":
+      compareOp = MongoCompareOp.NOT_EQUAL;
+      break;
+    case "greater_than_or_equal_to":
+      compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+      break;
+    case "greater_than":
+      compareOp = MongoCompareOp.GREATER;
+      break;
+    case "less_than_or_equal_to":
+      compareOp = MongoCompareOp.LESS_OR_EQUAL;
+      break;
+    case "less_than":
+      compareOp = MongoCompareOp.LESS;
+      break;
+    case "isnull":
+    case "isNull":
+    case "is null":
+      compareOp = MongoCompareOp.IFNULL;
+      break;
+    case "isnotnull":
+    case "isNotNull":
+    case "is not null":
+      compareOp = MongoCompareOp.IFNOTNULL;
+      break;
+    }
+
+    if (compareOp != null) {
+      BasicDBObject queryFilter = new BasicDBObject();
+      if (compareOp == MongoCompareOp.IFNULL) {
+        queryFilter.put(fieldName,
+            new BasicDBObject(MongoCompareOp.EQUAL.getCompareOp(), null));
+      } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+        queryFilter.put(fieldName,
+            new BasicDBObject(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+      } else {
+        queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(),
+            fieldValue));
+      }
+      return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
+          .getScanSpec().getCollectionName(), queryFilter);
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
new file mode 100644
index 0000000..ccce3d5
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.common.ChunkInfo;
+import org.bson.types.MaxKey;
+import org.bson.types.MinKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.mongodb.BasicDBObject;
+import com.mongodb.CommandResult;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+
+@JsonTypeName("mongo-scan")
+public class MongoGroupScan extends AbstractGroupScan implements
+    DrillMongoConstants {
+
+  private static final Integer select = Integer.valueOf(1);
+
+  static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+
+  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
+    @Override
+    public int compare(List<MongoSubScanSpec> list1,
+        List<MongoSubScanSpec> list2) {
+      return list1.size() - list2.size();
+    }
+  };
+
+  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections
+      .reverseOrder(LIST_SIZE_COMPARATOR);
+
+  private MongoStoragePlugin storagePlugin;
+
+  private MongoStoragePluginConfig storagePluginConfig;
+
+  private MongoScanSpec scanSpec;
+
+  private List<SchemaPath> columns;
+
+  private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+
+  // Sharding with replica sets contains all the replica server addresses for
+  // each chunk.
+  private Map<String, Set<ServerAddress>> chunksMapping;
+
+  private Map<String, List<ChunkInfo>> chunksInverseMapping;
+
+  private Stopwatch watch = new Stopwatch();
+
+  private boolean filterPushedDown = false;
+
+  @JsonCreator
+  public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
+      @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
+      ExecutionSetupException {
+    this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+        scanSpec, columns);
+  }
+
+  public MongoGroupScan(MongoStoragePlugin storagePlugin,
+      MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+    this.storagePlugin = storagePlugin;
+    this.storagePluginConfig = storagePlugin.getConfig();
+    this.scanSpec = scanSpec;
+    this.columns = columns;
+    this.storagePluginConfig.getConnection();
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that
+   *          The MongoGroupScan to clone
+   */
+  private MongoGroupScan(MongoGroupScan that) {
+    this.scanSpec = that.scanSpec;
+    this.columns = that.columns;
+    this.storagePlugin = that.storagePlugin;
+    this.storagePluginConfig = that.storagePluginConfig;
+    this.chunksMapping = that.chunksMapping;
+    this.chunksInverseMapping = that.chunksInverseMapping;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
+    this.filterPushedDown = that.filterPushedDown;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean filterPushedDown) {
+    this.filterPushedDown = filterPushedDown;
+  }
+
+  private boolean isShardedCluster(MongoClient client) {
+    // need to check better way of identifying
+    List<String> databaseNames = client.getDatabaseNames();
+    return databaseNames.contains(CONFIG);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void init() throws IOException {
+    MongoClient client = null;
+    try {
+      MongoClientURI clientURI = new MongoClientURI(
+          this.storagePluginConfig.getConnection());
+      client = new MongoClient(clientURI);
+
+      chunksMapping = Maps.newHashMap();
+      chunksInverseMapping = Maps.newLinkedHashMap();
+      if (isShardedCluster(client)) {
+        DB db = client.getDB(CONFIG);
+        db.setReadPreference(ReadPreference.nearest());
+        DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
+
+        DBObject query = new BasicDBObject(1);
+        query
+            .put(
+                NS,
+                this.scanSpec.getDbName() + "."
+                    + this.scanSpec.getCollectionName());
+
+        DBObject fields = new BasicDBObject();
+        fields.put(SHARD, select);
+        fields.put(MIN, select);
+        fields.put(MAX, select);
+
+        DBCursor chunkCursor = chunksCollection.find(query, fields);
+
+        DBCollection shardsCollection = db.getCollectionFromString(SHARDS);
+
+        fields = new BasicDBObject();
+        fields.put(HOST, select);
+
+        while (chunkCursor.hasNext()) {
+          DBObject chunkObj = chunkCursor.next();
+          String shardName = (String) chunkObj.get(SHARD);
+          String chunkId = (String) chunkObj.get(ID);
+          query = new BasicDBObject().append(ID, shardName);
+          DBCursor hostCursor = shardsCollection.find(query, fields);
+          while (hostCursor.hasNext()) {
+            DBObject hostObj = hostCursor.next();
+            String hostEntry = (String) hostObj.get(HOST);
+            String[] tagAndHost = StringUtils.split(hostEntry, '/');
+            String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
+                tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
+            Set<ServerAddress> addressList = chunksMapping.get(chunkId);
+            if (addressList == null) {
+              addressList = Sets.newHashSet();
+              chunksMapping.put(chunkId, addressList);
+            }
+            for (String host : hosts) {
+              addressList.add(new ServerAddress(host));
+            }
+            ServerAddress address = addressList.iterator().next();
+
+            List<ChunkInfo> chunkList = chunksInverseMapping.get(address
+                .getHost());
+            if (chunkList == null) {
+              chunkList = Lists.newArrayList();
+              chunksInverseMapping.put(address.getHost(), chunkList);
+            }
+            ChunkInfo chunkInfo = new ChunkInfo(Arrays.asList(hosts), chunkId);
+            DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
+
+            Map<String, Object> minFilters = Maps.newHashMap();
+            Map minMap = minObj.toMap();
+            Set keySet = minMap.keySet();
+            for (Object keyObj : keySet) {
+              Object object = minMap.get(keyObj);
+              if (!(object instanceof MinKey)) {
+                minFilters.put(keyObj.toString(), object);
+              }
+            }
+            chunkInfo.setMinFilters(minFilters);
+
+            DBObject maxObj = (BasicDBObject) chunkObj.get(MAX);
+            Map<String, Object> maxFilters = Maps.newHashMap();
+            Map maxMap = maxObj.toMap();
+            keySet = maxMap.keySet();
+            for (Object keyObj : keySet) {
+              Object object = maxMap.get(keyObj);
+              if (!(object instanceof MaxKey)) {
+                maxFilters.put(keyObj.toString(), object);
+              }
+            }
+
+            chunkInfo.setMaxFilters(maxFilters);
+            chunkList.add(chunkInfo);
+          }
+        }
+      } else {
+        String chunkName = scanSpec.getDbName() + "."
+            + scanSpec.getCollectionName();
+        List<String> hosts = clientURI.getHosts();
+        Set<ServerAddress> addressList = Sets.newHashSet();
+
+        for (String host : hosts) {
+          addressList.add(new ServerAddress(host));
+        }
+        chunksMapping.put(chunkName, addressList);
+
+        String host = hosts.get(0);
+        ServerAddress address = new ServerAddress(host);
+        ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
+        chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
+        chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+        List<ChunkInfo> chunksList = Lists.newArrayList();
+        chunksList.add(chunkInfo);
+        chunksInverseMapping.put(address.getHost(), chunksList);
+      }
+    } catch (UnknownHostException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+    }
+
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    MongoGroupScan clone = new MongoGroupScan(this);
+    clone.columns = columns;
+    return clone;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints)
+      throws PhysicalOperatorSetupException {
+    logger.debug("Incoming endpoints :" + endpoints);
+    watch.reset();
+    watch.start();
+
+    final int numSlots = endpoints.size();
+    int totalAssignmentsTobeDone = chunksMapping.size();
+
+    Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, String
+        .format("Incoming endpoints %d is greater than number of chunks %d",
+            numSlots, totalAssignmentsTobeDone));
+
+    final int minPerEndpointSlot = (int) Math
+        .floor((double) totalAssignmentsTobeDone / numSlots);
+    final int maxPerEndpointSlot = (int) Math
+        .ceil((double) totalAssignmentsTobeDone / numSlots);
+
+    endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+    for (int i = 0; i < numSlots; ++i) {
+      endpointFragmentMapping.put(i, new ArrayList<MongoSubScanSpec>(
+          maxPerEndpointSlot));
+      String hostname = endpoints.get(i).getAddress();
+      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+      if (hostIndexQueue == null) {
+        hostIndexQueue = Lists.newLinkedList();
+        endpointHostIndexListMap.put(hostname, hostIndexQueue);
+      }
+      hostIndexQueue.add(i);
+    }
+
+    Set<Entry<String, List<ChunkInfo>>> chunksToAssignSet = Sets
+        .newHashSet(chunksInverseMapping.entrySet());
+
+    for (Iterator<Entry<String, List<ChunkInfo>>> chunksIterator = chunksToAssignSet
+        .iterator(); chunksIterator.hasNext();) {
+      Entry<String, List<ChunkInfo>> chunkEntry = chunksIterator.next();
+      Queue<Integer> slots = endpointHostIndexListMap.get(chunkEntry.getKey());
+      if (slots != null) {
+        for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
+          Integer slotIndex = slots.poll();
+          List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+              .get(slotIndex);
+          subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
+          slots.offer(slotIndex);
+        }
+        chunksIterator.remove();
+      }
+    }
+
+    PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+        numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+        numSlots, LIST_SIZE_COMPARATOR_REV);
+    for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+      if (listOfScan.size() < minPerEndpointSlot) {
+        minHeap.offer(listOfScan);
+      } else if (listOfScan.size() > minPerEndpointSlot) {
+        maxHeap.offer(listOfScan);
+      }
+    }
+
+    if (chunksToAssignSet.size() > 0) {
+      for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
+        for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
+          List<MongoSubScanSpec> smallestList = minHeap.poll();
+          smallestList.add(buildSubScanSpecAndGet(chunkInfo));
+          minHeap.offer(smallestList);
+        }
+      }
+    }
+
+    while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
+      List<MongoSubScanSpec> smallestList = minHeap.poll();
+      List<MongoSubScanSpec> largestList = maxHeap.poll();
+      smallestList.add(largestList.remove(largestList.size() - 1));
+      if (largestList.size() > minPerEndpointSlot) {
+        maxHeap.offer(largestList);
+      }
+      if (smallestList.size() < minPerEndpointSlot) {
+        minHeap.offer(smallestList);
+      }
+    }
+
+    logger.debug(
+        "Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
+        watch.elapsed(TimeUnit.NANOSECONDS) / 1000, endpoints,
+        endpointFragmentMapping.toString());
+  }
+
+  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    MongoSubScanSpec subScanSpec = new MongoSubScanSpec()
+        .setDbName(scanSpec.getDbName())
+        .setCollectionName(scanSpec.getCollectionName())
+        .setHosts(chunkInfo.getChunkLocList())
+        .setMinFilters(chunkInfo.getMinFilters())
+        .setMaxFilters(chunkInfo.getMaxFilters())
+        .setFilter(scanSpec.getFilters());
+    return subScanSpec;
+  }
+
+  @Override
+  public MongoSubScan getSpecificScan(int minorFragmentId)
+      throws ExecutionSetupException {
+    return new MongoSubScan(storagePlugin, storagePluginConfig,
+        endpointFragmentMapping.get(minorFragmentId), columns);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return chunksMapping.size();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    MongoClientURI clientURI = new MongoClientURI(
+        this.storagePluginConfig.getConnection());
+    try {
+      List<String> hosts = clientURI.getHosts();
+      List<ServerAddress> addresses = Lists.newArrayList();
+      for (String host : hosts) {
+        addresses.add(new ServerAddress(host));
+      }
+      MongoClient client = MongoCnxnManager.getClient(addresses,
+          clientURI.getOptions());
+      DB db = client.getDB(scanSpec.getDbName());
+      db.setReadPreference(ReadPreference.nearest());
+      DBCollection collection = db.getCollectionFromString(scanSpec
+          .getCollectionName());
+      CommandResult stats = collection.getStats();
+      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
+          stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE));
+    } catch (Exception e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MongoGroupScan(this);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    watch.reset();
+    watch.start();
+
+    Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
+    for (DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) {
+      endpointMap.put(endpoint.getAddress(), endpoint);
+      logger.debug("Endpoint address: {}", endpoint.getAddress());
+    }
+
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
+    // As of now, considering only the first replica, though there may be
+    // multiple replicas for each chunk.
+    for (Set<ServerAddress> addressList : chunksMapping.values()) {
+      // Each replica can be on multiple machines, take the first one, which
+      // meets affinity.
+      for (ServerAddress address : addressList) {
+        DrillbitEndpoint ep = endpointMap.get(address.getHost());
+        if (ep != null) {
+          EndpointAffinity affinity = affinityMap.get(ep);
+          if (affinity == null) {
+            affinityMap.put(ep, new EndpointAffinity(ep, 1));
+          } else {
+            affinity.addAffinity(1);
+          }
+          break;
+        }
+      }
+    }
+    logger.debug("Took {} µs to get operator affinity",
+        watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+    logger.debug("Affined drillbits : " + affinityMap.values());
+    return Lists.newArrayList(affinityMap.values());
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("mongoScanSpec")
+  public MongoScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @JsonProperty("storage")
+  public MongoStoragePluginConfig getStorageConfig() {
+    return storagePluginConfig;
+  }
+
+  @JsonIgnore
+  public MongoStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
+  @Override
+  public String toString() {
+    return "MongoGroupScan [MongoScanSpec=" + scanSpec + ", columns=" + columns
+        + "]";
+  }
+
+  @VisibleForTesting
+  MongoGroupScan() {
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  void setChunksMapping(Map<String, Set<ServerAddress>> chunksMapping) {
+    this.chunksMapping = chunksMapping;
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  void setScanSpec(MongoScanSpec scanSpec) {
+    this.scanSpec = scanSpec;
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+    this.chunksInverseMapping = chunksInverseMapping;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
new file mode 100644
index 0000000..9af49b1
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
+  private static final Logger logger = LoggerFactory
+      .getLogger(MongoPushDownFilterForScan.class);
+  public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownFilterForScan();
+
+  private MongoPushDownFilterForScan() {
+    super(
+        RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+        "MongoPushDownFilterForScan");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final ScanPrel scan = (ScanPrel) call.rel(1);
+    final FilterPrel filter = (FilterPrel) call.rel(0);
+    final RexNode condition = filter.getCondition();
+
+    MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
+    if (groupScan.isFilterPushedDown()) {
+      return;
+    }
+
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+        new DrillParseContext(), scan, condition);
+    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
+        conditionExp);
+    MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
+    if (newScanSpec == null) {
+      return; // no filter pushdown so nothing to apply.
+    }
+
+    MongoGroupScan newGroupsScan = null;
+    try {
+      newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
+          newScanSpec, groupScan.getColumns());
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+    newGroupsScan.setFilterPushedDown(true);
+
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
+        newGroupsScan, scan.getRowType());
+    if (mongoFilterBuilder.isAllExpressionsConverted()) {
+      /*
+       * Since we could convert the entire filter condition expression into an
+       * Mongo filter, we can eliminate the filter operator altogether.
+       */
+      call.transformTo(newScanPrel);
+    } else {
+      call.transformTo(filter.copy(filter.getTraitSet(),
+          ImmutableList.of((RelNode) newScanPrel)));
+    }
+
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final ScanPrel scan = (ScanPrel) call.rel(1);
+    if (scan.getGroupScan() instanceof MongoGroupScan) {
+      return super.matches(call);
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
new file mode 100644
index 0000000..ad4e119
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+
+public class MongoRecordReader extends AbstractRecordReader {
+  static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
+
+  private static final int TARGET_RECORD_COUNT = 3000;
+
+  private DBCollection collection;
+  private DBCursor cursor;
+
+  private NullableVarCharVector valueVector;
+
+  private JsonReaderWithState jsonReaderWithState;
+  private VectorContainerWriter writer;
+  private List<SchemaPath> columns;
+
+  private BasicDBObject filters;
+  private DBObject fields;
+
+  private MongoClientOptions clientOptions;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+
+  private Boolean enableAllTextMode;
+
+  public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
+      List<SchemaPath> projectedColumns, FragmentContext context,
+      MongoClientOptions clientOptions) {
+    this.clientOptions = clientOptions;
+    this.fields = new BasicDBObject();
+    // exclude _id field, if not mentioned by user.
+    this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+    this.columns = projectedColumns;
+    setColumns(projectedColumns);
+    transformColumns(projectedColumns);
+    this.fragmentContext = context;
+    this.filters = new BasicDBObject();
+    Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
+        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    buildFilters(subScanSpec.getFilter(), mergedFilters);
+    enableAllTextMode = fragmentContext.getDrillbitContext().getOptionManager()
+        .getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
+    init(subScanSpec);
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(
+      Collection<SchemaPath> projectedColumns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (!isStarQuery()) {
+      Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
+      while (columnIterator.hasNext()) {
+        SchemaPath column = columnIterator.next();
+        NameSegment root = column.getRootSegment();
+        String fieldName = root.getPath();
+        transformed.add(SchemaPath.getSimplePath(fieldName));
+        this.fields.put(fieldName, Integer.valueOf(1));
+      }
+    }
+    return transformed;
+  }
+
+  private void buildFilters(BasicDBObject pushdownFilters,
+      Map<String, List<BasicDBObject>> mergedFilters) {
+    for (Entry<String, List<BasicDBObject>> entry : mergedFilters.entrySet()) {
+      List<BasicDBObject> list = entry.getValue();
+      if (list.size() == 1) {
+        this.filters.putAll(list.get(0).toMap());
+      } else {
+        BasicDBObject andQueryFilter = new BasicDBObject();
+        andQueryFilter.put("$and", list);
+        this.filters.putAll(andQueryFilter.toMap());
+      }
+    }
+    if (pushdownFilters != null && !pushdownFilters.toMap().isEmpty()) {
+      if (!mergedFilters.isEmpty()) {
+        this.filters = MongoUtils.andFilterAtIndex(this.filters,
+            pushdownFilters);
+      } else {
+        this.filters = pushdownFilters;
+      }
+    }
+  }
+
+  private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+    try {
+      List<String> hosts = subScanSpec.getHosts();
+      List<ServerAddress> addresses = Lists.newArrayList();
+      for (String host : hosts) {
+        addresses.add(new ServerAddress(host));
+      }
+      MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
+      DB db = client.getDB(subScanSpec.getDbName());
+      db.setReadPreference(ReadPreference.nearest());
+      collection = db.getCollection(subScanSpec.getCollectionName());
+    } catch (UnknownHostException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    if (isStarQuery()) {
+      try {
+        SchemaPath startColumn = SchemaPath.getSimplePath("*");
+        MaterializedField field = MaterializedField.create(startColumn,
+            Types.optional(MinorType.VARCHAR));
+        valueVector = output.addField(field, NullableVarCharVector.class);
+      } catch (SchemaChangeException e) {
+        throw new ExecutionSetupException(e);
+      }
+    } else {
+      try {
+        this.writer = new VectorContainerWriter(output);
+        this.jsonReaderWithState = new JsonReaderWithState(
+            fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
+      } catch (IOException e) {
+        throw new ExecutionSetupException(
+            "Failure in Mongo JsonReader initialization.", e);
+      }
+    }
+    logger.info("Filters Applied : " + filters);
+    logger.info("Fields Selected :" + fields);
+    cursor = collection.find(filters, fields);
+  }
+
+  private int handleNonStarQuery() {
+    writer.allocate();
+    writer.reset();
+
+    int docCount = 0;
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    int rowCount = 0;
+
+    try {
+      String errMsg = "Document {} is too big to fit into allocated ValueVector";
+      done: for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
+        writer.setPosition(docCount);
+        String doc = cursor.next().toString();
+        byte[] record = doc.getBytes(Charsets.UTF_8);
+        switch (jsonReaderWithState.write(record, writer)) {
+        case WRITE_SUCCEED:
+          docCount++;
+          break;
+
+        case WRITE_FAILED:
+          if (docCount == 0) {
+            throw new DrillRuntimeException(errMsg);
+          }
+          logger.warn(errMsg, doc);
+          break done;
+
+        default:
+          break done;
+        }
+      }
+
+      for (SchemaPath sp : jsonReaderWithState.getNullColumns()) {
+        PathSegment root = sp.getRootSegment();
+        BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+        if (root.getChild() != null && !root.getChild().isArray()) {
+          fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
+          while (root.getChild().getChild() != null
+              && !root.getChild().isArray()) {
+            fieldWriter = fieldWriter.map(root.getChild().getNameSegment()
+                .getPath());
+            root = root.getChild();
+          }
+          fieldWriter.integer(root.getChild().getNameSegment().getPath());
+        } else {
+          fieldWriter.integer(root.getNameSegment().getPath());
+        }
+      }
+
+      writer.setValueCount(docCount);
+      logger.debug("Took {} ms to get {} records",
+          watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+      return docCount;
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
+    }
+  }
+
+  private int handleStarQuery() {
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    int rowCount = 0;
+
+    if (valueVector == null) {
+      throw new DrillRuntimeException("Value vector is not initialized!!!");
+    }
+    valueVector.clear();
+    valueVector
+        .allocateNew(4 * 1024 * TARGET_RECORD_COUNT, TARGET_RECORD_COUNT);
+
+    String errMsg = "Document {} is too big to fit into allocated ValueVector";
+
+    try {
+      for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
+        String doc = cursor.next().toString();
+        byte[] record = doc.getBytes(Charsets.UTF_8);
+        if (!valueVector.getMutator().setSafe(rowCount, record, 0,
+            record.length)) {
+          logger.warn(errMsg, doc);
+          if (rowCount == 0) {
+            break;
+          }
+        }
+      }
+      valueVector.getMutator().setValueCount(rowCount);
+      logger.debug("Took {} ms to get {} records",
+          watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+      return rowCount;
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
+    }
+  }
+
+  @Override
+  public int next() {
+    return isStarQuery() ? handleStarQuery() : handleNonStarQuery();
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
new file mode 100644
index 0000000..8e5fd7d
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClientOptions;
+
+public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
+  static final Logger logger = LoggerFactory
+      .getLogger(MongoScanBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MongoSubScan subScan,
+      List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns = null;
+    for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
+        .getChunkScanSpecList()) {
+      try {
+        if ((columns = subScan.getColumns()) == null) {
+          columns = GroupScan.ALL_COLUMNS;
+        }
+        MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
+            .getMongoOptions();
+        readers.add(new MongoRecordReader(scanSpec, columns, context,
+            clientOptions));
+      } catch (Exception e) {
+        logger.error("MongoRecordReader creation failed for subScan:  "
+            + subScan + ".");
+        logger.error(e.getMessage(), e);
+        throw new ExecutionSetupException(e);
+      }
+    }
+    logger.info("Number of record readers initialized : " + readers.size());
+    return new ScanBatch(subScan, context, readers.iterator());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
new file mode 100644
index 0000000..d380207
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mongodb.BasicDBObject;
+
+public class MongoScanSpec {
+  private String dbName;
+  private String collectionName;
+
+  private BasicDBObject filters;
+
+  @JsonCreator
+  public MongoScanSpec(@JsonProperty("dbName") String dbName,
+      @JsonProperty("collectionName") String collectionName) {
+    this.dbName = dbName;
+    this.collectionName = collectionName;
+  }
+
+  public MongoScanSpec(String dbName, String collectionName,
+      BasicDBObject filters) {
+    this.dbName = dbName;
+    this.collectionName = collectionName;
+    this.filters = filters;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public String getCollectionName() {
+    return collectionName;
+  }
+
+  public BasicDBObject getFilters() {
+    return filters;
+  }
+
+  @Override
+  public String toString() {
+    return "MongoScanSpec [dbName=" + dbName + ", collectionName="
+        + collectionName + ", filters=" + filters + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
new file mode 100644
index 0000000..e46d8ec
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.util.Set;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+
+public class MongoStoragePlugin extends AbstractStoragePlugin {
+  static final Logger logger = LoggerFactory
+      .getLogger(MongoStoragePlugin.class);
+
+  private DrillbitContext context;
+  private MongoStoragePluginConfig mongoConfig;
+  private MongoSchemaFactory schemaFactory;
+
+  public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig,
+      DrillbitContext context, String name) throws IOException,
+      ExecutionSetupException {
+    this.context = context;
+    this.mongoConfig = mongoConfig;
+    this.schemaFactory = new MongoSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public MongoStoragePluginConfig getConfig() {
+    return mongoConfig;
+  }
+
+  @Override
+  public void registerSchemas(UserSession session, SchemaPlus parent) {
+    schemaFactory.registerSchemas(session, parent);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(JSONOptions selection)
+      throws IOException {
+    MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
+        new TypeReference<MongoScanSpec>() {
+        });
+    return new MongoGroupScan(this, mongoScanSpec, null);
+  }
+
+  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+    return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
new file mode 100644
index 0000000..b7cbf24
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientURI;
+
+@JsonTypeName(MongoStoragePluginConfig.NAME)
+public class MongoStoragePluginConfig extends StoragePluginConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(MongoStoragePluginConfig.class);
+
+  public static final String NAME = "mongo";
+
+  private String connection;
+
+  @JsonIgnore
+  private MongoClientURI clientURI;
+
+  @JsonCreator
+  public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
+    this.connection = connection;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    MongoStoragePluginConfig thatConfig = (MongoStoragePluginConfig) that;
+    return this.connection.equals(thatConfig.connection);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return this.connection != null ? this.connection.hashCode() : 0;
+  }
+
+  @JsonIgnore
+  public MongoClientOptions getMongoOptions() {
+    MongoClientURI clientURI = new MongoClientURI(connection);
+    return clientURI.getOptions();
+  }
+
+  public String getConnection() {
+    return connection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
new file mode 100644
index 0000000..36008cf
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.mongodb.BasicDBObject;
+
+@JsonTypeName("mongo-shard-read")
+public class MongoSubScan extends AbstractBase implements SubScan {
+  static final Logger logger = LoggerFactory.getLogger(MongoSubScan.class);
+
+  @JsonProperty
+  private final MongoStoragePluginConfig mongoPluginConfig;
+  @JsonIgnore
+  private final MongoStoragePlugin mongoStoragePlugin;
+  private final List<SchemaPath> columns;
+
+  private final List<MongoSubScanSpec> chunkScanSpecList;
+
+  @JsonCreator
+  public MongoSubScan(
+      @JacksonInject StoragePluginRegistry registry,
+      @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
+      @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+      @JsonProperty("columns") List<SchemaPath> columns)
+      throws ExecutionSetupException {
+    this.columns = columns;
+    this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
+    this.mongoStoragePlugin = (MongoStoragePlugin) registry
+        .getPlugin(mongoPluginConfig);
+    this.chunkScanSpecList = chunkScanSpecList;
+  }
+
+  public MongoSubScan(MongoStoragePlugin storagePlugin,
+      MongoStoragePluginConfig storagePluginConfig,
+      List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+    this.mongoStoragePlugin = storagePlugin;
+    this.mongoPluginConfig = storagePluginConfig;
+    this.columns = columns;
+    this.chunkScanSpecList = chunkScanSpecList;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(
+      PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @JsonIgnore
+  public MongoStoragePluginConfig getMongoPluginConfig() {
+    return mongoPluginConfig;
+  }
+
+  @JsonIgnore
+  public MongoStoragePlugin getMongoStoragePlugin() {
+    return mongoStoragePlugin;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public List<MongoSubScanSpec> getChunkScanSpecList() {
+    return chunkScanSpecList;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig,
+        chunkScanSpecList, columns);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return 1009;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  public static class MongoSubScanSpec {
+
+    protected String dbName;
+    protected String collectionName;
+    protected List<String> hosts;
+    protected Map<String, Object> minFilters;
+    protected Map<String, Object> maxFilters;
+
+    protected BasicDBObject filter;
+
+    @parquet.org.codehaus.jackson.annotate.JsonCreator
+    public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+        @JsonProperty("collectionName") String collectionName,
+        @JsonProperty("hosts") List<String> hosts,
+        @JsonProperty("minFilters") Map<String, Object> minFilters,
+        @JsonProperty("maxFilters") Map<String, Object> maxFilters,
+        @JsonProperty("filters") BasicDBObject filters) {
+      this.dbName = dbName;
+      this.collectionName = collectionName;
+      this.hosts = hosts;
+      this.minFilters = minFilters;
+      this.maxFilters = maxFilters;
+      this.filter = filters;
+    }
+
+    MongoSubScanSpec() {
+
+    }
+
+    public String getDbName() {
+      return dbName;
+    }
+
+    public MongoSubScanSpec setDbName(String dbName) {
+      this.dbName = dbName;
+      return this;
+    }
+
+    public String getCollectionName() {
+      return collectionName;
+    }
+
+    public MongoSubScanSpec setCollectionName(String collectionName) {
+      this.collectionName = collectionName;
+      return this;
+    }
+
+    public List<String> getHosts() {
+      return hosts;
+    }
+
+    public MongoSubScanSpec setHosts(List<String> hosts) {
+      this.hosts = hosts;
+      return this;
+    }
+
+    public Map<String, Object> getMinFilters() {
+      return minFilters;
+    }
+
+    public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
+      this.minFilters = minFilters;
+      return this;
+    }
+
+    public Map<String, Object> getMaxFilters() {
+      return maxFilters;
+    }
+
+    public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
+      this.maxFilters = maxFilters;
+      return this;
+    }
+
+    public BasicDBObject getFilter() {
+      return filter;
+    }
+
+    public MongoSubScanSpec setFilter(BasicDBObject filter) {
+      this.filter = filter;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "MongoSubScanSpec [dbName=" + dbName + ", collectionName="
+          + collectionName + ", hosts=" + hosts + ", minFilters=" + minFilters
+          + ", maxFilters=" + maxFilters + ", filter=" + filter + "]";
+    }
+
+  }
+
+}