You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2014/09/30 09:26:06 UTC
[04/21] git commit: DRILL-98: MongoDB storage plugin
DRILL-98: MongoDB storage plugin
This commit disables MongoDB PStore due to changes to the PStore interface.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2ca9c907
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2ca9c907
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2ca9c907
Branch: refs/heads/master
Commit: 2ca9c907bff639e08a561eac32e0acab3a0b3304
Parents: 786fd36
Author: Kamesh <ka...@gmail.com>
Authored: Sat Sep 27 18:35:28 2014 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Sun Sep 28 00:10:44 2014 -0700
----------------------------------------------------------------------
contrib/pom.xml | 1 +
contrib/storage-mongo/pom.xml | 85 +++
.../exec/store/mongo/DrillMongoConstants.java | 45 ++
.../exec/store/mongo/MongoCnxnManager.java | 73 +++
.../mongo/MongoCompareFunctionProcessor.java | 255 +++++++++
.../exec/store/mongo/MongoFilterBuilder.java | 219 ++++++++
.../drill/exec/store/mongo/MongoGroupScan.java | 538 +++++++++++++++++++
.../store/mongo/MongoPushDownFilterForScan.java | 103 ++++
.../exec/store/mongo/MongoRecordReader.java | 302 +++++++++++
.../exec/store/mongo/MongoScanBatchCreator.java | 68 +++
.../drill/exec/store/mongo/MongoScanSpec.java | 62 +++
.../exec/store/mongo/MongoStoragePlugin.java | 88 +++
.../store/mongo/MongoStoragePluginConfig.java | 72 +++
.../drill/exec/store/mongo/MongoSubScan.java | 214 ++++++++
.../drill/exec/store/mongo/MongoUtils.java | 89 +++
.../exec/store/mongo/common/ChunkInfo.java | 68 +++
.../exec/store/mongo/common/MongoCompareOp.java | 34 ++
.../exec/store/mongo/config/MongoPStore.java | 190 +++++++
.../store/mongo/config/MongoPStoreProvider.java | 76 +++
.../store/mongo/schema/MongoDatabaseSchema.java | 59 ++
.../store/mongo/schema/MongoSchemaFactory.java | 189 +++++++
.../resources/bootstrap-storage-plugins.json | 9 +
.../src/main/resources/drill-module.conf | 28 +
.../store/mongo/TestMongoChunkAssignment.java | 274 ++++++++++
distribution/pom.xml | 5 +
distribution/src/assemble/bin.xml | 1 +
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../server/options/SystemOptionManager.java | 1 +
.../exec/vector/complex/fn/JsonReader.java | 16 +-
.../vector/complex/fn/JsonReaderWithState.java | 22 +
30 files changed, 3186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 728038a..cbb5598 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>storage-hbase</module>
<module>storage-hive</module>
+ <module>storage-mongo</module>
<module>sqlline</module>
<module>data</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
new file mode 100644
index 0000000..720bf1e
--- /dev/null
+++ b/contrib/storage-mongo/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>0.6.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-mongo-storage</artifactId>
+
+ <name>contrib/mongo-storage-plugin</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>2.12.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Test dependencie -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>2.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>logback.log.dir</name>
+ <value>${project.build.directory}/surefire-reports</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
new file mode 100644
index 0000000..8261e2e
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+public interface DrillMongoConstants {
+
+ public static final String SYS_STORE_PROVIDER_MONGO_URL = "drill.exec.sys.store.provider.mongo.url";
+
+ public static final String ID = "_id";
+
+ public static final String SHARDS = "shards";
+
+ public static final String NS = "ns";
+
+ public static final String SHARD = "shard";
+
+ public static final String HOST = "host";
+
+ public static final String CHUNKS = "chunks";
+
+ public static final String SIZE = "size";
+
+ public static final String COUNT = "count";
+
+ public static final String CONFIG = "config";
+
+ public static final String MIN = "min";
+
+ public static final String MAX = "max";
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
new file mode 100644
index 0000000..a4482dd
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.ServerAddress;
+
+public class MongoCnxnManager {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(MongoCnxnManager.class);
+ private static Cache<ServerAddress, MongoClient> addressClientMap;
+
+ static {
+ addressClientMap = CacheBuilder.newBuilder().maximumSize(5)
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .removalListener(new AddressCloser()).build();
+ }
+
+ private static class AddressCloser implements
+ RemovalListener<ServerAddress, MongoClient> {
+ @Override
+ public synchronized void onRemoval(
+ RemovalNotification<ServerAddress, MongoClient> removal) {
+ removal.getValue().close();
+ ;
+ logger.debug("Closed connection to {}.", removal.getKey().toString());
+ }
+ }
+
+ public synchronized static MongoClient getClient(
+ List<ServerAddress> addresses, MongoClientOptions clientOptions)
+ throws UnknownHostException {
+ // Take the first replica from the replicated servers
+ ServerAddress serverAddress = addresses.get(0);
+ MongoClient client = addressClientMap.getIfPresent(serverAddress);
+ if (client == null) {
+ client = new MongoClient(addresses, clientOptions);
+ addressClientMap.put(serverAddress, client);
+ logger.debug("Created connection to {}.", serverAddress.toString());
+ logger.debug("Number of connections opened are {}.",
+ addressClientMap.size());
+ }
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
new file mode 100644
index 0000000..d588f1e
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class MongoCompareFunctionProcessor extends
+ AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+ private Object value;
+ private boolean success;
+ private boolean isEqualityFn;
+ private SchemaPath path;
+ private String functionName;
+
+ public static boolean isCompareFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ public static MongoCompareFunctionProcessor process(FunctionCall call) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1)
+ : null;
+ MongoCompareFunctionProcessor evaluator = new MongoCompareFunctionProcessor(
+ functionName);
+
+ if (valueArg != null) { // binary function
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+ .get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ } else if (call.args.get(0) instanceof SchemaPath) {
+ evaluator.success = true;
+ evaluator.path = (SchemaPath) nameArg;
+ }
+
+ return evaluator;
+ }
+
+ public MongoCompareFunctionProcessor(String functionName) {
+ this.success = false;
+ this.functionName = functionName;
+ this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+ .containsKey(functionName)
+ && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(
+ functionName);
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ public Boolean visitCastExpression(CastExpression e,
+ LogicalExpression valueArg) throws RuntimeException {
+ if (e.getInput() instanceof CastExpression
+ || e.getInput() instanceof SchemaPath) {
+ return e.getInput().accept(this, valueArg);
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitConvertExpression(ConvertExpression e,
+ LogicalExpression valueArg) throws RuntimeException {
+ if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM
+ && e.getInput() instanceof SchemaPath) {
+ String encodingType = e.getEncodingType();
+ switch (encodingType) {
+ case "INT_BE":
+ case "INT":
+ case "UINT_BE":
+ case "UINT":
+ case "UINT4_BE":
+ case "UINT4":
+ if (valueArg instanceof IntExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ this.value = ((IntExpression) valueArg).getInt();
+ }
+ break;
+ case "BIGINT_BE":
+ case "BIGINT":
+ case "UINT8_BE":
+ case "UINT8":
+ if (valueArg instanceof LongExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ this.value = ((LongExpression) valueArg).getLong();
+ }
+ break;
+ case "FLOAT":
+ if (valueArg instanceof FloatExpression && isEqualityFn) {
+ this.value = ((FloatExpression) valueArg).getFloat();
+ }
+ break;
+ case "DOUBLE":
+ if (valueArg instanceof DoubleExpression && isEqualityFn) {
+ this.value = ((DoubleExpression) valueArg).getDouble();
+ }
+ break;
+ case "TIME_EPOCH":
+ case "TIME_EPOCH_BE":
+ if (valueArg instanceof TimeExpression) {
+ this.value = ((TimeExpression) valueArg).getTime();
+ }
+ break;
+ case "DATE_EPOCH":
+ case "DATE_EPOCH_BE":
+ if (valueArg instanceof DateExpression) {
+ this.value = ((DateExpression) valueArg).getDate();
+ }
+ break;
+ case "BOOLEAN_BYTE":
+ if (valueArg instanceof BooleanExpression) {
+ this.value = ((BooleanExpression) valueArg).getBoolean();
+ }
+ break;
+ case "UTF8":
+ // let visitSchemaPath() handle this.
+ return e.getInput().accept(this, valueArg);
+ }
+
+ if (value != null) {
+ this.path = (SchemaPath) e.getInput();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg)
+ throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg)
+ throws RuntimeException {
+ if (valueArg instanceof QuotedString) {
+ this.value = ((QuotedString) valueArg).value;
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof IntExpression) {
+ this.value = ((IntExpression) valueArg).getInt();
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof LongExpression) {
+ this.value = ((LongExpression) valueArg).getLong();
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof FloatExpression) {
+ this.value = ((FloatExpression) valueArg).getFloat();
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof DoubleExpression) {
+ this.value = ((DoubleExpression) valueArg).getDouble();
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof BooleanExpression) {
+ this.value = ((BooleanExpression) valueArg).getBoolean();
+ this.path = path;
+ return true;
+ }
+
+ return false;
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet
+ .builder();
+ VALUE_EXPRESSION_CLASSES = builder.add(BooleanExpression.class)
+ .add(DateExpression.class).add(DoubleExpression.class)
+ .add(FloatExpression.class).add(IntExpression.class)
+ .add(LongExpression.class).add(QuotedString.class)
+ .add(TimeExpression.class).build();
+ }
+
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // unary functions
+ .put("isnotnull", "isnotnull")
+ .put("isNotNull", "isNotNull")
+ .put("is not null", "is not null")
+ .put("isnull", "isnull")
+ .put("isNull", "isNull")
+ .put("is null", "is null")
+ // binary functions
+ .put("equal", "equal").put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than").build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
new file mode 100644
index 0000000..def793a
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.mongodb.BasicDBObject;
+
+public class MongoFilterBuilder extends
+ AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+ DrillMongoConstants {
+ static final Logger logger = LoggerFactory
+ .getLogger(MongoFilterBuilder.class);
+ final MongoGroupScan groupScan;
+ final LogicalExpression le;
+ private boolean allExpressionsConverted = true;
+
+ public MongoFilterBuilder(MongoGroupScan groupScan,
+ LogicalExpression conditionExp) {
+ this.groupScan = groupScan;
+ this.le = conditionExp;
+ }
+
+ public MongoScanSpec parseTree() {
+ MongoScanSpec parsedSpec = le.accept(this, null);
+ if (parsedSpec != null) {
+ parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getScanSpec(),
+ parsedSpec);
+ }
+ return parsedSpec;
+ }
+
+ private MongoScanSpec mergeScanSpecs(String functionName,
+ MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+ BasicDBObject newFilter = null;
+
+ switch (functionName) {
+ case "booleanAnd":
+ if (leftScanSpec.getFilters() != null
+ && rightScanSpec.getFilters() != null) {
+ newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
+ rightScanSpec.getFilters());
+ } else if (leftScanSpec.getFilters() != null) {
+ newFilter = leftScanSpec.getFilters();
+ } else {
+ newFilter = rightScanSpec.getFilters();
+ }
+ break;
+ case "booleanOr":
+ newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
+ rightScanSpec.getFilters());
+ }
+ return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
+ .getScanSpec().getCollectionName(), newFilter);
+ }
+
+ public boolean isAllExpressionsConverted() {
+ return allExpressionsConverted;
+ }
+
+ @Override
+ public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+ throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
+ }
+
+ @Override
+ public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+ List<LogicalExpression> args = op.args;
+ MongoScanSpec nodeScanSpec = null;
+ String functionName = op.getName();
+ for (int i = 0; i < args.size(); ++i) {
+ switch (functionName) {
+ case "booleanAnd":
+ case "booleanOr":
+ if (nodeScanSpec == null) {
+ nodeScanSpec = args.get(i).accept(this, null);
+ } else {
+ MongoScanSpec scanSpec = args.get(i).accept(this, null);
+ if (scanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+ } else {
+ allExpressionsConverted = false;
+ }
+ }
+ break;
+ }
+ }
+ return nodeScanSpec;
+ }
+
+ @Override
+ public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+ throws RuntimeException {
+ MongoScanSpec nodeScanSpec = null;
+ String functionName = call.getName();
+ ImmutableList<LogicalExpression> args = call.args;
+
+ if (MongoCompareFunctionProcessor.isCompareFunction(functionName)) {
+ MongoCompareFunctionProcessor processor = MongoCompareFunctionProcessor
+ .process(call);
+ if (processor.isSuccess()) {
+ try {
+ nodeScanSpec = createMongoScanSpec(processor.getFunctionName(),
+ processor.getPath(), processor.getValue());
+ } catch (Exception e) {
+ logger.error(" Failed to creare Filter ", e);
+ // throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ } else {
+ switch (functionName) {
+ case "booleanAnd":
+ case "booleanOr":
+ MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
+ MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
+ if (leftScanSpec != null && rightScanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
+ rightScanSpec);
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
+ nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+ }
+ }
+ break;
+ }
+ }
+
+ if (nodeScanSpec == null) {
+ allExpressionsConverted = false;
+ }
+
+ return nodeScanSpec;
+ }
+
+ private MongoScanSpec createMongoScanSpec(String functionName,
+ SchemaPath field, Object fieldValue) throws ClassNotFoundException,
+ IOException {
+ // extract the field name
+ String fieldName = field.getAsUnescapedPath();
+ MongoCompareOp compareOp = null;
+ switch (functionName) {
+ case "equal":
+ compareOp = MongoCompareOp.EQUAL;
+ break;
+ case "not_equal":
+ compareOp = MongoCompareOp.NOT_EQUAL;
+ break;
+ case "greater_than_or_equal_to":
+ compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+ break;
+ case "greater_than":
+ compareOp = MongoCompareOp.GREATER;
+ break;
+ case "less_than_or_equal_to":
+ compareOp = MongoCompareOp.LESS_OR_EQUAL;
+ break;
+ case "less_than":
+ compareOp = MongoCompareOp.LESS;
+ break;
+ case "isnull":
+ case "isNull":
+ case "is null":
+ compareOp = MongoCompareOp.IFNULL;
+ break;
+ case "isnotnull":
+ case "isNotNull":
+ case "is not null":
+ compareOp = MongoCompareOp.IFNOTNULL;
+ break;
+ }
+
+ if (compareOp != null) {
+ BasicDBObject queryFilter = new BasicDBObject();
+ if (compareOp == MongoCompareOp.IFNULL) {
+ queryFilter.put(fieldName,
+ new BasicDBObject(MongoCompareOp.EQUAL.getCompareOp(), null));
+ } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+ queryFilter.put(fieldName,
+ new BasicDBObject(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+ } else {
+ queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(),
+ fieldValue));
+ }
+ return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
+ .getScanSpec().getCollectionName(), queryFilter);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
new file mode 100644
index 0000000..ccce3d5
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.common.ChunkInfo;
+import org.bson.types.MaxKey;
+import org.bson.types.MinKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.mongodb.BasicDBObject;
+import com.mongodb.CommandResult;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+
+@JsonTypeName("mongo-scan")
+public class MongoGroupScan extends AbstractGroupScan implements
+ DrillMongoConstants {
+
+ private static final Integer select = Integer.valueOf(1);
+
+ static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+
+ private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
+ @Override
+ public int compare(List<MongoSubScanSpec> list1,
+ List<MongoSubScanSpec> list2) {
+ return list1.size() - list2.size();
+ }
+ };
+
+ private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections
+ .reverseOrder(LIST_SIZE_COMPARATOR);
+
+ private MongoStoragePlugin storagePlugin;
+
+ private MongoStoragePluginConfig storagePluginConfig;
+
+ private MongoScanSpec scanSpec;
+
+ private List<SchemaPath> columns;
+
+ private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+
+ // Sharding with replica sets contains all the replica server addresses for
+ // each chunk.
+ private Map<String, Set<ServerAddress>> chunksMapping;
+
+ private Map<String, List<ChunkInfo>> chunksInverseMapping;
+
+ private Stopwatch watch = new Stopwatch();
+
+ private boolean filterPushedDown = false;
+
+ @JsonCreator
+ public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
+ @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
+ ExecutionSetupException {
+ this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+ scanSpec, columns);
+ }
+
+ public MongoGroupScan(MongoStoragePlugin storagePlugin,
+ MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+ this.storagePlugin = storagePlugin;
+ this.storagePluginConfig = storagePlugin.getConfig();
+ this.scanSpec = scanSpec;
+ this.columns = columns;
+ this.storagePluginConfig.getConnection();
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ * @param that
+ * The MongoGroupScan to clone
+ */
+ private MongoGroupScan(MongoGroupScan that) {
+ this.scanSpec = that.scanSpec;
+ this.columns = that.columns;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginConfig = that.storagePluginConfig;
+ this.chunksMapping = that.chunksMapping;
+ this.chunksInverseMapping = that.chunksInverseMapping;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
+ this.filterPushedDown = that.filterPushedDown;
+ }
+
+ @JsonIgnore
+ public boolean isFilterPushedDown() {
+ return filterPushedDown;
+ }
+
+ @JsonIgnore
+ public void setFilterPushedDown(boolean filterPushedDown) {
+ this.filterPushedDown = filterPushedDown;
+ }
+
+ private boolean isShardedCluster(MongoClient client) {
+ // need to check better way of identifying
+ List<String> databaseNames = client.getDatabaseNames();
+ return databaseNames.contains(CONFIG);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void init() throws IOException {
+ MongoClient client = null;
+ try {
+ MongoClientURI clientURI = new MongoClientURI(
+ this.storagePluginConfig.getConnection());
+ client = new MongoClient(clientURI);
+
+ chunksMapping = Maps.newHashMap();
+ chunksInverseMapping = Maps.newLinkedHashMap();
+ if (isShardedCluster(client)) {
+ DB db = client.getDB(CONFIG);
+ db.setReadPreference(ReadPreference.nearest());
+ DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
+
+ DBObject query = new BasicDBObject(1);
+ query
+ .put(
+ NS,
+ this.scanSpec.getDbName() + "."
+ + this.scanSpec.getCollectionName());
+
+ DBObject fields = new BasicDBObject();
+ fields.put(SHARD, select);
+ fields.put(MIN, select);
+ fields.put(MAX, select);
+
+ DBCursor chunkCursor = chunksCollection.find(query, fields);
+
+ DBCollection shardsCollection = db.getCollectionFromString(SHARDS);
+
+ fields = new BasicDBObject();
+ fields.put(HOST, select);
+
+ while (chunkCursor.hasNext()) {
+ DBObject chunkObj = chunkCursor.next();
+ String shardName = (String) chunkObj.get(SHARD);
+ String chunkId = (String) chunkObj.get(ID);
+ query = new BasicDBObject().append(ID, shardName);
+ DBCursor hostCursor = shardsCollection.find(query, fields);
+ while (hostCursor.hasNext()) {
+ DBObject hostObj = hostCursor.next();
+ String hostEntry = (String) hostObj.get(HOST);
+ String[] tagAndHost = StringUtils.split(hostEntry, '/');
+ String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
+ tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
+ Set<ServerAddress> addressList = chunksMapping.get(chunkId);
+ if (addressList == null) {
+ addressList = Sets.newHashSet();
+ chunksMapping.put(chunkId, addressList);
+ }
+ for (String host : hosts) {
+ addressList.add(new ServerAddress(host));
+ }
+ ServerAddress address = addressList.iterator().next();
+
+ List<ChunkInfo> chunkList = chunksInverseMapping.get(address
+ .getHost());
+ if (chunkList == null) {
+ chunkList = Lists.newArrayList();
+ chunksInverseMapping.put(address.getHost(), chunkList);
+ }
+ ChunkInfo chunkInfo = new ChunkInfo(Arrays.asList(hosts), chunkId);
+ DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
+
+ Map<String, Object> minFilters = Maps.newHashMap();
+ Map minMap = minObj.toMap();
+ Set keySet = minMap.keySet();
+ for (Object keyObj : keySet) {
+ Object object = minMap.get(keyObj);
+ if (!(object instanceof MinKey)) {
+ minFilters.put(keyObj.toString(), object);
+ }
+ }
+ chunkInfo.setMinFilters(minFilters);
+
+ DBObject maxObj = (BasicDBObject) chunkObj.get(MAX);
+ Map<String, Object> maxFilters = Maps.newHashMap();
+ Map maxMap = maxObj.toMap();
+ keySet = maxMap.keySet();
+ for (Object keyObj : keySet) {
+ Object object = maxMap.get(keyObj);
+ if (!(object instanceof MaxKey)) {
+ maxFilters.put(keyObj.toString(), object);
+ }
+ }
+
+ chunkInfo.setMaxFilters(maxFilters);
+ chunkList.add(chunkInfo);
+ }
+ }
+ } else {
+ String chunkName = scanSpec.getDbName() + "."
+ + scanSpec.getCollectionName();
+ List<String> hosts = clientURI.getHosts();
+ Set<ServerAddress> addressList = Sets.newHashSet();
+
+ for (String host : hosts) {
+ addressList.add(new ServerAddress(host));
+ }
+ chunksMapping.put(chunkName, addressList);
+
+ String host = hosts.get(0);
+ ServerAddress address = new ServerAddress(host);
+ ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
+ chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
+ chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
+ List<ChunkInfo> chunksList = Lists.newArrayList();
+ chunksList.add(chunkInfo);
+ chunksInverseMapping.put(address.getHost(), chunksList);
+ }
+ } catch (UnknownHostException e) {
+ throw new DrillRuntimeException(e.getMessage(), e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ MongoGroupScan clone = new MongoGroupScan(this);
+ clone.columns = columns;
+ return clone;
+ }
+
+ @Override
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return true;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints)
+ throws PhysicalOperatorSetupException {
+ logger.debug("Incoming endpoints :" + endpoints);
+ watch.reset();
+ watch.start();
+
+ final int numSlots = endpoints.size();
+ int totalAssignmentsTobeDone = chunksMapping.size();
+
+ Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, String
+ .format("Incoming endpoints %d is greater than number of chunks %d",
+ numSlots, totalAssignmentsTobeDone));
+
+ final int minPerEndpointSlot = (int) Math
+ .floor((double) totalAssignmentsTobeDone / numSlots);
+ final int maxPerEndpointSlot = (int) Math
+ .ceil((double) totalAssignmentsTobeDone / numSlots);
+
+ endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+ Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+ for (int i = 0; i < numSlots; ++i) {
+ endpointFragmentMapping.put(i, new ArrayList<MongoSubScanSpec>(
+ maxPerEndpointSlot));
+ String hostname = endpoints.get(i).getAddress();
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+ if (hostIndexQueue == null) {
+ hostIndexQueue = Lists.newLinkedList();
+ endpointHostIndexListMap.put(hostname, hostIndexQueue);
+ }
+ hostIndexQueue.add(i);
+ }
+
+ Set<Entry<String, List<ChunkInfo>>> chunksToAssignSet = Sets
+ .newHashSet(chunksInverseMapping.entrySet());
+
+ for (Iterator<Entry<String, List<ChunkInfo>>> chunksIterator = chunksToAssignSet
+ .iterator(); chunksIterator.hasNext();) {
+ Entry<String, List<ChunkInfo>> chunkEntry = chunksIterator.next();
+ Queue<Integer> slots = endpointHostIndexListMap.get(chunkEntry.getKey());
+ if (slots != null) {
+ for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
+ Integer slotIndex = slots.poll();
+ List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+ .get(slotIndex);
+ subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
+ slots.offer(slotIndex);
+ }
+ chunksIterator.remove();
+ }
+ }
+
+ PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+ numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<List<MongoSubScanSpec>>(
+ numSlots, LIST_SIZE_COMPARATOR_REV);
+ for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ if (listOfScan.size() < minPerEndpointSlot) {
+ minHeap.offer(listOfScan);
+ } else if (listOfScan.size() > minPerEndpointSlot) {
+ maxHeap.offer(listOfScan);
+ }
+ }
+
+ if (chunksToAssignSet.size() > 0) {
+ for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
+ for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
+ List<MongoSubScanSpec> smallestList = minHeap.poll();
+ smallestList.add(buildSubScanSpecAndGet(chunkInfo));
+ minHeap.offer(smallestList);
+ }
+ }
+ }
+
+ while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
+ List<MongoSubScanSpec> smallestList = minHeap.poll();
+ List<MongoSubScanSpec> largestList = maxHeap.poll();
+ smallestList.add(largestList.remove(largestList.size() - 1));
+ if (largestList.size() > minPerEndpointSlot) {
+ maxHeap.offer(largestList);
+ }
+ if (smallestList.size() < minPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+
+ logger.debug(
+ "Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
+ watch.elapsed(TimeUnit.NANOSECONDS) / 1000, endpoints,
+ endpointFragmentMapping.toString());
+ }
+
+ private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+ MongoSubScanSpec subScanSpec = new MongoSubScanSpec()
+ .setDbName(scanSpec.getDbName())
+ .setCollectionName(scanSpec.getCollectionName())
+ .setHosts(chunkInfo.getChunkLocList())
+ .setMinFilters(chunkInfo.getMinFilters())
+ .setMaxFilters(chunkInfo.getMaxFilters())
+ .setFilter(scanSpec.getFilters());
+ return subScanSpec;
+ }
+
+ @Override
+ public MongoSubScan getSpecificScan(int minorFragmentId)
+ throws ExecutionSetupException {
+ return new MongoSubScan(storagePlugin, storagePluginConfig,
+ endpointFragmentMapping.get(minorFragmentId), columns);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return chunksMapping.size();
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ MongoClientURI clientURI = new MongoClientURI(
+ this.storagePluginConfig.getConnection());
+ try {
+ List<String> hosts = clientURI.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : hosts) {
+ addresses.add(new ServerAddress(host));
+ }
+ MongoClient client = MongoCnxnManager.getClient(addresses,
+ clientURI.getOptions());
+ DB db = client.getDB(scanSpec.getDbName());
+ db.setReadPreference(ReadPreference.nearest());
+ DBCollection collection = db.getCollectionFromString(scanSpec
+ .getCollectionName());
+ CommandResult stats = collection.getStats();
+ return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
+ stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE));
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+ throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MongoGroupScan(this);
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ watch.reset();
+ watch.start();
+
+ Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
+ for (DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) {
+ endpointMap.put(endpoint.getAddress(), endpoint);
+ logger.debug("Endpoint address: {}", endpoint.getAddress());
+ }
+
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
+ // As of now, considering only the first replica, though there may be
+ // multiple replicas for each chunk.
+ for (Set<ServerAddress> addressList : chunksMapping.values()) {
+ // Each replica can be on multiple machines, take the first one, which
+ // meets affinity.
+ for (ServerAddress address : addressList) {
+ DrillbitEndpoint ep = endpointMap.get(address.getHost());
+ if (ep != null) {
+ EndpointAffinity affinity = affinityMap.get(ep);
+ if (affinity == null) {
+ affinityMap.put(ep, new EndpointAffinity(ep, 1));
+ } else {
+ affinity.addAffinity(1);
+ }
+ break;
+ }
+ }
+ }
+ logger.debug("Took {} µs to get operator affinity",
+ watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+ logger.debug("Affined drillbits : " + affinityMap.values());
+ return Lists.newArrayList(affinityMap.values());
+ }
+
+ @JsonProperty
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @JsonProperty("mongoScanSpec")
+ public MongoScanSpec getScanSpec() {
+ return scanSpec;
+ }
+
+ @JsonProperty("storage")
+ public MongoStoragePluginConfig getStorageConfig() {
+ return storagePluginConfig;
+ }
+
+ @JsonIgnore
+ public MongoStoragePlugin getStoragePlugin() {
+ return storagePlugin;
+ }
+
+ @Override
+ public String toString() {
+ return "MongoGroupScan [MongoScanSpec=" + scanSpec + ", columns=" + columns
+ + "]";
+ }
+
+ @VisibleForTesting
+ MongoGroupScan() {
+ }
+
+ @JsonIgnore
+ @VisibleForTesting
+ void setChunksMapping(Map<String, Set<ServerAddress>> chunksMapping) {
+ this.chunksMapping = chunksMapping;
+ }
+
+ @JsonIgnore
+ @VisibleForTesting
+ void setScanSpec(MongoScanSpec scanSpec) {
+ this.scanSpec = scanSpec;
+ }
+
+ @JsonIgnore
+ @VisibleForTesting
+ void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+ this.chunksInverseMapping = chunksInverseMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
new file mode 100644
index 0000000..9af49b1
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
+ private static final Logger logger = LoggerFactory
+ .getLogger(MongoPushDownFilterForScan.class);
+ public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownFilterForScan();
+
+ private MongoPushDownFilterForScan() {
+ super(
+ RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+ "MongoPushDownFilterForScan");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ final FilterPrel filter = (FilterPrel) call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
+ if (groupScan.isFilterPushedDown()) {
+ return;
+ }
+
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(), scan, condition);
+ MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
+ conditionExp);
+ MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
+ if (newScanSpec == null) {
+ return; // no filter pushdown so nothing to apply.
+ }
+
+ MongoGroupScan newGroupsScan = null;
+ try {
+ newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
+ newScanSpec, groupScan.getColumns());
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
+ newGroupsScan.setFilterPushedDown(true);
+
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
+ newGroupsScan, scan.getRowType());
+ if (mongoFilterBuilder.isAllExpressionsConverted()) {
+ /*
+ * Since we could convert the entire filter condition expression into an
+ * Mongo filter, we can eliminate the filter operator altogether.
+ */
+ call.transformTo(newScanPrel);
+ } else {
+ call.transformTo(filter.copy(filter.getTraitSet(),
+ ImmutableList.of((RelNode) newScanPrel)));
+ }
+
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof MongoGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
new file mode 100644
index 0000000..ad4e119
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+
+public class MongoRecordReader extends AbstractRecordReader {
+ static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
+
+ private static final int TARGET_RECORD_COUNT = 3000;
+
+ private DBCollection collection;
+ private DBCursor cursor;
+
+ private NullableVarCharVector valueVector;
+
+ private JsonReaderWithState jsonReaderWithState;
+ private VectorContainerWriter writer;
+ private List<SchemaPath> columns;
+
+ private BasicDBObject filters;
+ private DBObject fields;
+
+ private MongoClientOptions clientOptions;
+ private FragmentContext fragmentContext;
+ private OperatorContext operatorContext;
+
+ private Boolean enableAllTextMode;
+
+ public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec,
+ List<SchemaPath> projectedColumns, FragmentContext context,
+ MongoClientOptions clientOptions) {
+ this.clientOptions = clientOptions;
+ this.fields = new BasicDBObject();
+ // exclude _id field, if not mentioned by user.
+ this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
+ this.columns = projectedColumns;
+ setColumns(projectedColumns);
+ transformColumns(projectedColumns);
+ this.fragmentContext = context;
+ this.filters = new BasicDBObject();
+ Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
+ subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+ buildFilters(subScanSpec.getFilter(), mergedFilters);
+ enableAllTextMode = fragmentContext.getDrillbitContext().getOptionManager()
+ .getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
+ init(subScanSpec);
+ }
+
+ @Override
+ protected Collection<SchemaPath> transformColumns(
+ Collection<SchemaPath> projectedColumns) {
+ Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+ if (!isStarQuery()) {
+ Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
+ while (columnIterator.hasNext()) {
+ SchemaPath column = columnIterator.next();
+ NameSegment root = column.getRootSegment();
+ String fieldName = root.getPath();
+ transformed.add(SchemaPath.getSimplePath(fieldName));
+ this.fields.put(fieldName, Integer.valueOf(1));
+ }
+ }
+ return transformed;
+ }
+
+ private void buildFilters(BasicDBObject pushdownFilters,
+ Map<String, List<BasicDBObject>> mergedFilters) {
+ for (Entry<String, List<BasicDBObject>> entry : mergedFilters.entrySet()) {
+ List<BasicDBObject> list = entry.getValue();
+ if (list.size() == 1) {
+ this.filters.putAll(list.get(0).toMap());
+ } else {
+ BasicDBObject andQueryFilter = new BasicDBObject();
+ andQueryFilter.put("$and", list);
+ this.filters.putAll(andQueryFilter.toMap());
+ }
+ }
+ if (pushdownFilters != null && !pushdownFilters.toMap().isEmpty()) {
+ if (!mergedFilters.isEmpty()) {
+ this.filters = MongoUtils.andFilterAtIndex(this.filters,
+ pushdownFilters);
+ } else {
+ this.filters = pushdownFilters;
+ }
+ }
+ }
+
+ private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+ try {
+ List<String> hosts = subScanSpec.getHosts();
+ List<ServerAddress> addresses = Lists.newArrayList();
+ for (String host : hosts) {
+ addresses.add(new ServerAddress(host));
+ }
+ MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
+ DB db = client.getDB(subScanSpec.getDbName());
+ db.setReadPreference(ReadPreference.nearest());
+ collection = db.getCollection(subScanSpec.getCollectionName());
+ } catch (UnknownHostException e) {
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ if (isStarQuery()) {
+ try {
+ SchemaPath startColumn = SchemaPath.getSimplePath("*");
+ MaterializedField field = MaterializedField.create(startColumn,
+ Types.optional(MinorType.VARCHAR));
+ valueVector = output.addField(field, NullableVarCharVector.class);
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ } else {
+ try {
+ this.writer = new VectorContainerWriter(output);
+ this.jsonReaderWithState = new JsonReaderWithState(
+ fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
+ } catch (IOException e) {
+ throw new ExecutionSetupException(
+ "Failure in Mongo JsonReader initialization.", e);
+ }
+ }
+ logger.info("Filters Applied : " + filters);
+ logger.info("Fields Selected :" + fields);
+ cursor = collection.find(filters, fields);
+ }
+
+ private int handleNonStarQuery() {
+ writer.allocate();
+ writer.reset();
+
+ int docCount = 0;
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ int rowCount = 0;
+
+ try {
+ String errMsg = "Document {} is too big to fit into allocated ValueVector";
+ done: for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
+ writer.setPosition(docCount);
+ String doc = cursor.next().toString();
+ byte[] record = doc.getBytes(Charsets.UTF_8);
+ switch (jsonReaderWithState.write(record, writer)) {
+ case WRITE_SUCCEED:
+ docCount++;
+ break;
+
+ case WRITE_FAILED:
+ if (docCount == 0) {
+ throw new DrillRuntimeException(errMsg);
+ }
+ logger.warn(errMsg, doc);
+ break done;
+
+ default:
+ break done;
+ }
+ }
+
+ for (SchemaPath sp : jsonReaderWithState.getNullColumns()) {
+ PathSegment root = sp.getRootSegment();
+ BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+ if (root.getChild() != null && !root.getChild().isArray()) {
+ fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
+ while (root.getChild().getChild() != null
+ && !root.getChild().isArray()) {
+ fieldWriter = fieldWriter.map(root.getChild().getNameSegment()
+ .getPath());
+ root = root.getChild();
+ }
+ fieldWriter.integer(root.getChild().getNameSegment().getPath());
+ } else {
+ fieldWriter.integer(root.getNameSegment().getPath());
+ }
+ }
+
+ writer.setValueCount(docCount);
+ logger.debug("Took {} ms to get {} records",
+ watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+ return docCount;
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
+ }
+ }
+
+ private int handleStarQuery() {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ int rowCount = 0;
+
+ if (valueVector == null) {
+ throw new DrillRuntimeException("Value vector is not initialized!!!");
+ }
+ valueVector.clear();
+ valueVector
+ .allocateNew(4 * 1024 * TARGET_RECORD_COUNT, TARGET_RECORD_COUNT);
+
+ String errMsg = "Document {} is too big to fit into allocated ValueVector";
+
+ try {
+ for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
+ String doc = cursor.next().toString();
+ byte[] record = doc.getBytes(Charsets.UTF_8);
+ if (!valueVector.getMutator().setSafe(rowCount, record, 0,
+ record.length)) {
+ logger.warn(errMsg, doc);
+ if (rowCount == 0) {
+ break;
+ }
+ }
+ }
+ valueVector.getMutator().setValueCount(rowCount);
+ logger.debug("Took {} ms to get {} records",
+ watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+ return rowCount;
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
+ }
+ }
+
+ @Override
+ public int next() {
+ return isStarQuery() ? handleStarQuery() : handleNonStarQuery();
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public void setOperatorContext(OperatorContext operatorContext) {
+ this.operatorContext = operatorContext;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
new file mode 100644
index 0000000..8e5fd7d
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClientOptions;
+
+public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
+ static final Logger logger = LoggerFactory
+ .getLogger(MongoScanBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, MongoSubScan subScan,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<RecordReader> readers = Lists.newArrayList();
+ List<SchemaPath> columns = null;
+ for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
+ .getChunkScanSpecList()) {
+ try {
+ if ((columns = subScan.getColumns()) == null) {
+ columns = GroupScan.ALL_COLUMNS;
+ }
+ MongoClientOptions clientOptions = subScan.getMongoPluginConfig()
+ .getMongoOptions();
+ readers.add(new MongoRecordReader(scanSpec, columns, context,
+ clientOptions));
+ } catch (Exception e) {
+ logger.error("MongoRecordReader creation failed for subScan: "
+ + subScan + ".");
+ logger.error(e.getMessage(), e);
+ throw new ExecutionSetupException(e);
+ }
+ }
+ logger.info("Number of record readers initialized : " + readers.size());
+ return new ScanBatch(subScan, context, readers.iterator());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
new file mode 100644
index 0000000..d380207
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mongodb.BasicDBObject;
+
+public class MongoScanSpec {
+ private String dbName;
+ private String collectionName;
+
+ private BasicDBObject filters;
+
+ @JsonCreator
+ public MongoScanSpec(@JsonProperty("dbName") String dbName,
+ @JsonProperty("collectionName") String collectionName) {
+ this.dbName = dbName;
+ this.collectionName = collectionName;
+ }
+
+ public MongoScanSpec(String dbName, String collectionName,
+ BasicDBObject filters) {
+ this.dbName = dbName;
+ this.collectionName = collectionName;
+ this.filters = filters;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public BasicDBObject getFilters() {
+ return filters;
+ }
+
+ @Override
+ public String toString() {
+ return "MongoScanSpec [dbName=" + dbName + ", collectionName="
+ + collectionName + ", filters=" + filters + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
new file mode 100644
index 0000000..e46d8ec
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.io.IOException;
+import java.util.Set;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+
+public class MongoStoragePlugin extends AbstractStoragePlugin {
+ static final Logger logger = LoggerFactory
+ .getLogger(MongoStoragePlugin.class);
+
+ private DrillbitContext context;
+ private MongoStoragePluginConfig mongoConfig;
+ private MongoSchemaFactory schemaFactory;
+
+ public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig,
+ DrillbitContext context, String name) throws IOException,
+ ExecutionSetupException {
+ this.context = context;
+ this.mongoConfig = mongoConfig;
+ this.schemaFactory = new MongoSchemaFactory(this, name);
+ }
+
+ public DrillbitContext getContext() {
+ return this.context;
+ }
+
+ @Override
+ public MongoStoragePluginConfig getConfig() {
+ return mongoConfig;
+ }
+
+ @Override
+ public void registerSchemas(UserSession session, SchemaPlus parent) {
+ schemaFactory.registerSchemas(session, parent);
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection)
+ throws IOException {
+ MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
+ new TypeReference<MongoScanSpec>() {
+ });
+ return new MongoGroupScan(this, mongoScanSpec, null);
+ }
+
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
new file mode 100644
index 0000000..b7cbf24
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientURI;
+
+@JsonTypeName(MongoStoragePluginConfig.NAME)
+public class MongoStoragePluginConfig extends StoragePluginConfig {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(MongoStoragePluginConfig.class);
+
+ public static final String NAME = "mongo";
+
+ private String connection;
+
+ @JsonIgnore
+ private MongoClientURI clientURI;
+
+ @JsonCreator
+ public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ } else if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+ MongoStoragePluginConfig thatConfig = (MongoStoragePluginConfig) that;
+ return this.connection.equals(thatConfig.connection);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return this.connection != null ? this.connection.hashCode() : 0;
+ }
+
+ @JsonIgnore
+ public MongoClientOptions getMongoOptions() {
+ MongoClientURI clientURI = new MongoClientURI(connection);
+ return clientURI.getOptions();
+ }
+
+ public String getConnection() {
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
new file mode 100644
index 0000000..36008cf
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.mongodb.BasicDBObject;
+
+@JsonTypeName("mongo-shard-read")
+public class MongoSubScan extends AbstractBase implements SubScan {
+ static final Logger logger = LoggerFactory.getLogger(MongoSubScan.class);
+
+ @JsonProperty
+ private final MongoStoragePluginConfig mongoPluginConfig;
+ @JsonIgnore
+ private final MongoStoragePlugin mongoStoragePlugin;
+ private final List<SchemaPath> columns;
+
+ private final List<MongoSubScanSpec> chunkScanSpecList;
+
+ @JsonCreator
+ public MongoSubScan(
+ @JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
+ @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+ @JsonProperty("columns") List<SchemaPath> columns)
+ throws ExecutionSetupException {
+ this.columns = columns;
+ this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
+ this.mongoStoragePlugin = (MongoStoragePlugin) registry
+ .getPlugin(mongoPluginConfig);
+ this.chunkScanSpecList = chunkScanSpecList;
+ }
+
+ public MongoSubScan(MongoStoragePlugin storagePlugin,
+ MongoStoragePluginConfig storagePluginConfig,
+ List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+ this.mongoStoragePlugin = storagePlugin;
+ this.mongoPluginConfig = storagePluginConfig;
+ this.columns = columns;
+ this.chunkScanSpecList = chunkScanSpecList;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @JsonIgnore
+ public MongoStoragePluginConfig getMongoPluginConfig() {
+ return mongoPluginConfig;
+ }
+
+ @JsonIgnore
+ public MongoStoragePlugin getMongoStoragePlugin() {
+ return mongoStoragePlugin;
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ public List<MongoSubScanSpec> getChunkScanSpecList() {
+ return chunkScanSpecList;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+ throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig,
+ chunkScanSpecList, columns);
+ }
+
+ @Override
+ public int getOperatorType() {
+ return 1009;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ public static class MongoSubScanSpec {
+
+ protected String dbName;
+ protected String collectionName;
+ protected List<String> hosts;
+ protected Map<String, Object> minFilters;
+ protected Map<String, Object> maxFilters;
+
+ protected BasicDBObject filter;
+
+ @parquet.org.codehaus.jackson.annotate.JsonCreator
+ public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+ @JsonProperty("collectionName") String collectionName,
+ @JsonProperty("hosts") List<String> hosts,
+ @JsonProperty("minFilters") Map<String, Object> minFilters,
+ @JsonProperty("maxFilters") Map<String, Object> maxFilters,
+ @JsonProperty("filters") BasicDBObject filters) {
+ this.dbName = dbName;
+ this.collectionName = collectionName;
+ this.hosts = hosts;
+ this.minFilters = minFilters;
+ this.maxFilters = maxFilters;
+ this.filter = filters;
+ }
+
+ MongoSubScanSpec() {
+
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public MongoSubScanSpec setDbName(String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public MongoSubScanSpec setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ return this;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public MongoSubScanSpec setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ return this;
+ }
+
+ public Map<String, Object> getMinFilters() {
+ return minFilters;
+ }
+
+ public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
+ this.minFilters = minFilters;
+ return this;
+ }
+
+ public Map<String, Object> getMaxFilters() {
+ return maxFilters;
+ }
+
+ public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
+ this.maxFilters = maxFilters;
+ return this;
+ }
+
+ public BasicDBObject getFilter() {
+ return filter;
+ }
+
+ public MongoSubScanSpec setFilter(BasicDBObject filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "MongoSubScanSpec [dbName=" + dbName + ", collectionName="
+ + collectionName + ", hosts=" + hosts + ", minFilters=" + minFilters
+ + ", maxFilters=" + maxFilters + ", filter=" + filter + "]";
+ }
+
+ }
+
+}