You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/10/25 23:08:58 UTC
[drill] 02/08: DRILL-6381: (Part 2) MapRDB plugin update to 6.0.1
This is an automated email from the ASF dual-hosted git repository.
amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit a4f62e9b5d0e69585b91042236c1ca68e1d628bc
Author: rebase <bu...@mapr.com>
AuthorDate: Mon Feb 12 14:31:49 2018 -0800
DRILL-6381: (Part 2) MapRDB plugin update to 6.0.1
1. MD-3960: Update Drill to build with MapR-6.0.1 libraries
2. MD-3995: Do not pushdown limit 0 past project with CONVERT_FROMJSON
3. MD-4054: Restricted scan limit is changed to dynamically read rows using the rowcount of the rightside instead of 4096.
4. MD-3688: Impersonating a view owner doesn't work with security disabled in 6.0
5. MD-4492: Missing limit pushdown changes in JsonTableGroupScan
Co-authored-by: chunhui-shi <cs...@maprtech.com>
Co-authored-by: Gautam Parai <gp...@maprtech.com>
Co-authored-by: Vlad Rozov <vr...@mapr.com>
Conflicts:
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
pom.xml
---
contrib/format-maprdb/pom.xml | 89 ++++
.../planner/index/MapRDBFunctionalIndexInfo.java | 163 +++++++
.../drill/exec/planner/index/MapRDBStatistics.java | 337 +++++++++++++
.../planner/index/MapRDBStatisticsPayload.java | 56 +++
.../drill/exec/store/mapr/PluginConstants.java | 90 ++++
.../drill/exec/store/mapr/PluginErrorHandler.java | 50 ++
.../drill/exec/store/mapr/db/MapRDBCost.java | 104 ++++
.../exec/store/mapr/db/MapRDBFormatMatcher.java | 4 +-
.../exec/store/mapr/db/MapRDBFormatPlugin.java | 67 ++-
.../store/mapr/db/MapRDBFormatPluginConfig.java | 4 +
.../drill/exec/store/mapr/db/MapRDBGroupScan.java | 75 ++-
.../store/mapr/db/MapRDBPushFilterIntoScan.java | 3 +-
.../store/mapr/db/MapRDBPushLimitIntoScan.java | 157 ++++++
.../store/mapr/db/MapRDBPushProjectIntoScan.java | 141 ++++++
.../mapr/db/MapRDBRestrictedScanBatchCreator.java | 50 ++
.../exec/store/mapr/db/MapRDBScanBatchCreator.java | 6 +-
.../drill/exec/store/mapr/db/MapRDBSubScan.java | 23 +-
.../exec/store/mapr/db/MapRDBSubScanSpec.java | 54 ++-
.../drill/exec/store/mapr/db/MapRDBTableCache.java | 232 +++++++++
.../store/mapr/db/RestrictedMapRDBSubScan.java | 80 +++
.../store/mapr/db/RestrictedMapRDBSubScanSpec.java | 219 +++++++++
.../store/mapr/db/binary/BinaryTableGroupScan.java | 34 +-
.../store/mapr/db/json/AllTextValueWriter.java | 80 +++
.../mapr/db/json/CompareFunctionsProcessor.java | 15 +-
.../mapr/db/json/DocumentReaderVectorWriter.java | 42 ++
.../exec/store/mapr/db/json/FieldPathHelper.java | 75 +++
.../mapr/db/json/FieldTransferVectorWriter.java | 49 ++
.../store/mapr/db/json/IdOnlyVectorWriter.java | 65 +++
.../store/mapr/db/json/JsonConditionBuilder.java | 59 ++-
.../exec/store/mapr/db/json/JsonScanSpec.java | 93 +++-
.../exec/store/mapr/db/json/JsonSubScanSpec.java | 69 +--
.../store/mapr/db/json/JsonTableGroupScan.java | 485 ++++++++++++++++---
.../store/mapr/db/json/MaprDBJsonRecordReader.java | 538 ++++++++-------------
.../mapr/db/json/NumbersAsDoubleValueWriter.java | 51 ++
.../store/mapr/db/json/OjaiFunctionsProcessor.java | 214 ++++++++
.../exec/store/mapr/db/json/OjaiValueWriter.java | 194 ++++++++
.../db/json/ProjectionPassthroughVectorWriter.java | 83 ++++
.../mapr/db/json/RestrictedJsonRecordReader.java | 248 ++++++++++
.../store/mapr/db/json/RowCountVectorWriter.java | 40 ++
.../exec/udf/mapr/db/ConditionPlaceholder.java | 54 +++
.../drill/exec/udf/mapr/db/DecodeFieldPath.java | 65 +++
.../drill/exec/udf/mapr/db/MatchesPlaceholder.java | 54 +++
.../exec/udf/mapr/db/NotMatchesPlaceholder.java | 54 +++
.../exec/udf/mapr/db/NotTypeOfPlaceholder.java | 54 +++
.../drill/exec/udf/mapr/db/SizeOfPlaceholder.java | 55 +++
.../drill/exec/udf/mapr/db/TypeOfPlaceholder.java | 54 +++
.../src/main/resources/drill-module.conf | 20 +-
.../mapr/drill/maprdb/tests/MaprDBTestsSuite.java | 63 +--
.../mapr/drill/maprdb/tests/json/BaseJsonTest.java | 13 +
.../maprdb/tests/json/TestEncodedFieldPaths.java | 128 +++++
.../maprdb/tests/json/TestFieldPathHelper.java | 52 ++
.../drill/maprdb/tests/json/TestScanRanges.java | 158 ++++++
.../drill/maprdb/tests/json/TestSimpleJson.java | 216 +++++----
.../{ => com/mapr/drill}/json/business.json | 0
.../mapr/drill/json/encoded_fields_userdata.json | 5 +
.../resources/{hbase-site.xml => core-site.xml} | 5 +
.../exec/physical/base/AbstractGroupScan.java | 6 +
.../apache/drill/exec/physical/base/GroupScan.java | 3 +
.../drill/exec/planner/physical/PrelUtil.java | 231 +++++++++
.../java/org/apache/drill/test/BaseTestQuery.java | 12 +-
pom.xml | 4 +-
61 files changed, 5079 insertions(+), 665 deletions(-)
diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml
index 816eafb..bfc3131 100644
--- a/contrib/format-maprdb/pom.xml
+++ b/contrib/format-maprdb/pom.xml
@@ -83,6 +83,41 @@
</systemProperties>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <!-- do not package test configurations in the jar files as it can override
+ the installed one -->
+ <exclude>**/core-site.xml</exclude>
+ <exclude>**/logback.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <!-- we need the source plugin for the UDFs -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>add-sources-as-resources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+ <directory>src/main/java</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -132,6 +167,14 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -140,6 +183,24 @@
</dependency>
<!-- Test dependencies -->
<dependency>
+ <groupId>com.mapr.db</groupId>
+ <artifactId>maprdb</artifactId>
+ <version>${mapr.release.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>com.mapr.hadoop</groupId>
+ <artifactId>maprfs</artifactId>
+ <version>${mapr.release.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.54</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.mapr</groupId>
<artifactId>mapr-java-utils</artifactId>
<version>${mapr.release.version}</version>
@@ -157,6 +218,16 @@
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
@@ -169,6 +240,14 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -177,6 +256,16 @@
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
new file mode 100644
index 0000000..01561a3
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Map;
+import java.util.Set;
+
+public class MapRDBFunctionalIndexInfo implements FunctionalIndexInfo {
+
+ final private IndexDescriptor indexDesc;
+
+ private boolean hasFunctionalField = false;
+
+ //when we scan schemaPath in groupscan's columns, we check if this column(schemaPath) should be rewritten to '$N',
+ //When there are more than two functions on the same column in index, CAST(a.b as INT), CAST(a.b as VARCHAR),
+ // then we should map SchemaPath a.b to a set of SchemaPath, e.g. $1, $2
+ private Map<SchemaPath, Set<SchemaPath>> columnToConvert;
+
+ // map of functional index expression to destination SchemaPath e.g. $N
+ private Map<LogicalExpression, LogicalExpression> exprToConvert;
+
+ //map of SchemaPath involved in a functional field
+ private Map<LogicalExpression, Set<SchemaPath>> pathsInExpr;
+
+ private Set<SchemaPath> newPathsForIndexedFunction;
+
+ private Set<SchemaPath> allPathsInFunction;
+
+ public MapRDBFunctionalIndexInfo(IndexDescriptor indexDesc) {
+ this.indexDesc = indexDesc;
+ columnToConvert = Maps.newHashMap();
+ exprToConvert = Maps.newHashMap();
+ pathsInExpr = Maps.newHashMap();
+ //keep the order of new paths, it may be related to the naming policy
+ newPathsForIndexedFunction = Sets.newLinkedHashSet();
+ allPathsInFunction = Sets.newHashSet();
+ init();
+ }
+
+ private void init() {
+ int count = 0;
+ for(LogicalExpression indexedExpr : indexDesc.getIndexColumns()) {
+ if( !(indexedExpr instanceof SchemaPath) ) {
+ hasFunctionalField = true;
+ SchemaPath functionalFieldPath = SchemaPath.getSimplePath("$"+count);
+ newPathsForIndexedFunction.add(functionalFieldPath);
+
+ //now we handle only cast expression
+ if(indexedExpr instanceof CastExpression) {
+ //We handle only CAST directly on SchemaPath for now.
+ SchemaPath pathBeingCasted = (SchemaPath)((CastExpression) indexedExpr).getInput();
+ addTargetPathForOriginalPath(pathBeingCasted, functionalFieldPath);
+ addPathInExpr(indexedExpr, pathBeingCasted);
+ exprToConvert.put(indexedExpr, functionalFieldPath);
+ allPathsInFunction.add(pathBeingCasted);
+ }
+
+ count++;
+ }
+ }
+ }
+
+ private void addPathInExpr(LogicalExpression expr, SchemaPath path) {
+ if (!pathsInExpr.containsKey(expr)) {
+ Set<SchemaPath> newSet = Sets.newHashSet();
+ newSet.add(path);
+ pathsInExpr.put(expr, newSet);
+ }
+ else {
+ pathsInExpr.get(expr).add(path);
+ }
+ }
+
+ private void addTargetPathForOriginalPath(SchemaPath origPath, SchemaPath newPath) {
+ if (!columnToConvert.containsKey(origPath)) {
+ Set<SchemaPath> newSet = Sets.newHashSet();
+ newSet.add(newPath);
+ columnToConvert.put(origPath, newSet);
+ }
+ else {
+ columnToConvert.get(origPath).add(newPath);
+ }
+ }
+
+
+ public boolean hasFunctional() {
+ return hasFunctionalField;
+ }
+
+ public IndexDescriptor getIndexDesc() {
+ return indexDesc;
+ }
+
+ /**
+ * getNewPath: for an original path, return new rename '$N' path, notice there could be multiple renamed paths
+ * if the there are multiple functional indexes refer original path.
+ * @param path
+ * @return
+ */
+ public SchemaPath getNewPath(SchemaPath path) {
+ if(columnToConvert.containsKey(path)) {
+ return columnToConvert.get(path).iterator().next();
+ }
+ return null;
+ }
+
+ /**
+ * return a plain field path if the incoming index expression 'expr' is replaced to be a plain field
+ * @param expr suppose to be an indexed expression
+ * @return the renamed schemapath in index table for the indexed expression
+ */
+ public SchemaPath getNewPathFromExpr(LogicalExpression expr) {
+ if(exprToConvert.containsKey(expr)) {
+ return (SchemaPath)exprToConvert.get(expr);
+ }
+ return null;
+ }
+
+ /**
+ * @return the map of indexed expression --> the involved schema paths in a indexed expression
+ */
+ public Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr() {
+ return pathsInExpr;
+ }
+
+
+ public Map<LogicalExpression, LogicalExpression> getExprMap() {
+ return exprToConvert;
+ }
+
+ public Set<SchemaPath> allNewSchemaPaths() {
+ return newPathsForIndexedFunction;
+ }
+
+ public Set<SchemaPath> allPathsInFunction() {
+ return allPathsInFunction;
+ }
+
+ public boolean supportEqualCharConvertToLike() {
+ return true;
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
new file mode 100644
index 0000000..dc89a4d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.ojai.store.QueryCondition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MapRDBStatistics implements Statistics {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBStatistics.class);
+ static final String nullConditionAsString = "<NULL>";
+ private double rowKeyJoinBackIOFactor = 1.0;
+ private boolean statsAvailable = false;
+ private StatisticsPayload fullTableScanPayload = null;
+ /*
+ * The computed statistics are cached in <statsCache> so that any subsequent calls are returned
+ * from the cache. The <statsCache> is a map of <RexNode, map<Index, Stats Payload>>. The <RexNode>
+ * does not have a comparator so it is converted to a String for serving as a Map key. This may result
+ * in logically equivalent conditions considered differently e.g. sal<10 OR sal>100, sal>100 OR sal<10
+ * the second map maintains statistics per index as not all statistics are independent of the index
+ * e.g. average row size.
+ */
+ private Map<String, Map<String, StatisticsPayload>> statsCache;
+ /*
+ * The filter independent computed statistics are cached in <fIStatsCache> so that any subsequent
+ * calls are returned from the cache. The <fIStatsCache> is a map of <Index, Stats Payload>. This
+ * cache maintains statistics per index as not all statistics are independent of the index
+ * e.g. average row size.
+ */
+ private Map<String, StatisticsPayload> fIStatsCache;
+ /*
+ /*
+ * The mapping between <QueryCondition> and <RexNode> is kept in <conditionRexNodeMap>. This mapping
+ * is useful to obtain rowCount for condition specified as <QueryCondition> required during physical
+ * planning. Again, both the <QueryCondition> and <RexNode> are converted to Strings for the lack
+ * of a comparator.
+ */
+ private Map<String, String> conditionRexNodeMap;
+
+ public MapRDBStatistics() {
+ statsCache = new HashMap<>();
+ fIStatsCache = new HashMap<>();
+ conditionRexNodeMap = new HashMap<>();
+ }
+
+ public double getRowKeyJoinBackIOFactor() {
+ return rowKeyJoinBackIOFactor;
+ }
+
+ @Override
+ public boolean isStatsAvailable() {
+ return statsAvailable;
+ }
+
+ @Override
+ public String buildUniqueIndexIdentifier(IndexDescriptor idx) {
+ if (idx == null) {
+ return null;
+ } else {
+ return idx.getTableName() + "_" + idx.getIndexName();
+ }
+ }
+
+ public String buildUniqueIndexIdentifier(String tableName, String idxName) {
+ if (tableName == null || idxName == null) {
+ return null;
+ } else {
+ return tableName + "_" + idxName;
+ }
+ }
+
+ @Override
+ /** Returns the number of rows satisfying the given FILTER condition
+ * @param condition - FILTER specified as a {@link RexNode}
+ * @param tabIdxName - The table/index identifier
+ * @return approximate rows satisfying the filter
+ */
+ public double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel) {
+ String conditionAsStr = nullConditionAsString;
+ Map<String, StatisticsPayload> payloadMap;
+ if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan)
+ || (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) {
+ if (condition == null && fullTableScanPayload != null) {
+ return fullTableScanPayload.getRowCount();
+ } else if (condition != null) {
+ conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+ payloadMap = statsCache.get(conditionAsStr);
+ if (payloadMap != null) {
+ if (payloadMap.get(tabIdxName) != null) {
+ return payloadMap.get(tabIdxName).getRowCount();
+ } else {
+ // We might not have computed rowcount for the given condition from the tab/index in question.
+ // For rowcount it does not matter which index was used to get the rowcount for the given condition.
+ // Hence, just use the first one!
+ for (String payloadKey : payloadMap.keySet()) {
+ if (payloadKey != null && payloadMap.get(payloadKey) != null) {
+ return payloadMap.get(payloadKey).getRowCount();
+ }
+ }
+ StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue();
+ return anyPayload.getRowCount();
+ }
+ }
+ }
+ }
+ if (statsAvailable) {
+ logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
+ }
+ return ROWCOUNT_UNKNOWN;
+ }
+
+ /** Returns the number of rows satisfying the given FILTER condition
+ * @param condition - FILTER specified as a {@link QueryCondition}
+ * @param tabIdxName - The table/index identifier
+ * @return approximate rows satisfying the filter
+ */
+ public double getRowCount(QueryCondition condition, String tabIdxName) {
+ String conditionAsStr = nullConditionAsString;
+ Map<String, StatisticsPayload> payloadMap;
+ if (condition != null
+ && conditionRexNodeMap.get(condition.toString()) != null) {
+ String rexConditionAsString = conditionRexNodeMap.get(condition.toString());
+ payloadMap = statsCache.get(rexConditionAsString);
+ if (payloadMap != null) {
+ if (payloadMap.get(tabIdxName) != null) {
+ return payloadMap.get(tabIdxName).getRowCount();
+ } else {
+ // We might not have computed rowcount for the given condition from the tab/index in question.
+ // For rowcount it does not matter which index was used to get the rowcount for the given condition.
+ // if tabIdxName is null, most likely we have found one from payloadMap and won't come to here.
+ // If we come to here, we are looking for payload for an index, so let us use any index's payload first!
+ for (String payloadKey : payloadMap.keySet()) {
+ if (payloadKey != null && payloadMap.get(payloadKey) != null) {
+ return payloadMap.get(payloadKey).getRowCount();
+ }
+ }
+ StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue();
+ return anyPayload.getRowCount();
+ }
+ }
+ } else if (condition == null
+ && fullTableScanPayload != null) {
+ return fullTableScanPayload.getRowCount();
+ }
+ if (condition != null) {
+ conditionAsStr = condition.toString();
+ }
+ if (statsAvailable) {
+ logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
+ }
+ return ROWCOUNT_UNKNOWN;
+ }
+
+ /** Returns the number of leading rows satisfying the given FILTER condition
+ * @param condition - FILTER specified as a {@link RexNode}
+ * @param tabIdxName - The table/index identifier
+ * @param scanRel - The current scanRel
+ * @return approximate rows satisfying the leading filter
+ */
+ @Override
+ public double getLeadingRowCount(RexNode condition, String tabIdxName, RelNode scanRel) {
+ String conditionAsStr = nullConditionAsString;
+ Map<String, StatisticsPayload> payloadMap;
+ if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan)
+ || (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) {
+ if (condition == null && fullTableScanPayload != null) {
+ return fullTableScanPayload.getLeadingRowCount();
+ } else if (condition != null) {
+ conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+ payloadMap = statsCache.get(conditionAsStr);
+ if (payloadMap != null) {
+ if (payloadMap.get(tabIdxName) != null) {
+ return payloadMap.get(tabIdxName).getLeadingRowCount();
+ }
+ // Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is
+ // not found, we are out of luck!
+ }
+ }
+ }
+ if (statsAvailable) {
+ logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr);
+ }
+ return ROWCOUNT_UNKNOWN;
+ }
+
+ /** Returns the number of leading rows satisfying the given FILTER condition
+ * @param condition - FILTER specified as a {@link QueryCondition}
+ * @param tabIdxName - The table/index identifier
+ * @return approximate rows satisfying the leading filter
+ */
+ public double getLeadingRowCount(QueryCondition condition, String tabIdxName) {
+ String conditionAsStr = nullConditionAsString;
+ Map<String, StatisticsPayload> payloadMap;
+ if (condition != null
+ && conditionRexNodeMap.get(condition.toString()) != null) {
+ String rexConditionAsString = conditionRexNodeMap.get(condition.toString());
+ payloadMap = statsCache.get(rexConditionAsString);
+ if (payloadMap != null) {
+ if (payloadMap.get(tabIdxName) != null) {
+ return payloadMap.get(tabIdxName).getLeadingRowCount();
+ }
+ // Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is
+ // not found, we are out of luck!
+ }
+ } else if (condition == null
+ && fullTableScanPayload != null) {
+ return fullTableScanPayload.getLeadingRowCount();
+ }
+ if (condition != null) {
+ conditionAsStr = condition.toString();
+ }
+ if (statsAvailable) {
+ logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr);
+ }
+ return ROWCOUNT_UNKNOWN;
+ }
+
+ @Override
+ public double getAvgRowSize(String tabIdxName, boolean isTableScan) {
+ StatisticsPayload payloadMap;
+ if (isTableScan && fullTableScanPayload != null) {
+ return fullTableScanPayload.getAvgRowSize();
+ } else if (!isTableScan) {
+ payloadMap = fIStatsCache.get(tabIdxName);
+ if (payloadMap != null) {
+ return payloadMap.getAvgRowSize();
+ }
+ }
+ if (statsAvailable) {
+ logger.debug("Statistics: Average row size is UNKNOWN for table: {}", tabIdxName);
+ }
+ return AVG_ROWSIZE_UNKNOWN;
+ }
+
+ public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) {
+ //XXX to implement for complete secondary index framework
+ return false;
+ }
+
+ /*
+ * Convert the given RexNode to a String representation while also replacing the RexInputRef references
+ * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
+ * expressions may differ in the RexInputRef positions but otherwise the same.
+ * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
+ */
+ private String convertRexToString(RexNode condition, RelDataType rowType) {
+ StringBuilder sb = new StringBuilder();
+ if (condition == null) {
+ return null;
+ }
+ if (condition.getKind() == SqlKind.AND) {
+ boolean first = true;
+ for(RexNode pred : RelOptUtil.conjunctions(condition)) {
+ if (first) {
+ sb.append(convertRexToString(pred, rowType));
+ first = false;
+ } else {
+ sb.append(" " + SqlKind.AND.toString() + " ");
+ sb.append(convertRexToString(pred, rowType));
+ }
+ }
+ return sb.toString();
+ } else if (condition.getKind() == SqlKind.OR) {
+ boolean first = true;
+ for(RexNode pred : RelOptUtil.disjunctions(condition)) {
+ if (first) {
+ sb.append(convertRexToString(pred, rowType));
+ first = false;
+ } else {
+ sb.append(" " + SqlKind.OR.toString() + " ");
+ sb.append(convertRexToString(pred, rowType));
+ }
+ }
+ return sb.toString();
+ } else {
+ HashMap<String, String> inputRefMapping = new HashMap<>();
+ /* Based on the rel projection the input reference for the same column may change
+ * during planning. We want the cache to be agnostic to it. Hence, the entry stored
+ * in the cache has the input reference ($i) replaced with the column name
+ */
+ getInputRefMapping(condition, rowType, inputRefMapping);
+ if (inputRefMapping.keySet().size() > 0) {
+ //Found input ref - replace it
+ String replCondition = condition.toString();
+ for (String inputRef : inputRefMapping.keySet()) {
+ replCondition = replCondition.replace(inputRef, inputRefMapping.get(inputRef));
+ }
+ return replCondition;
+ } else {
+ return condition.toString();
+ }
+ }
+ }
+
+ /*
+ * Generate the input reference to column mapping for reference replacement. Please
+ * look at the usage in convertRexToString() to understand why this mapping is required.
+ */
+ private void getInputRefMapping(RexNode condition, RelDataType rowType,
+ HashMap<String, String> mapping) {
+ if (condition instanceof RexCall) {
+ for (RexNode op : ((RexCall) condition).getOperands()) {
+ getInputRefMapping(op, rowType, mapping);
+ }
+ } else if (condition instanceof RexInputRef) {
+ mapping.put(condition.toString(),
+ rowType.getFieldNames().get(condition.hashCode()));
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatisticsPayload.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatisticsPayload.java
new file mode 100644
index 0000000..4930282
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatisticsPayload.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+public class MapRDBStatisticsPayload implements StatisticsPayload {
+
+ double rowCount;
+ double leadingRowCount;
+ double avgRowSize;
+
+ public MapRDBStatisticsPayload(double rowCount, double leadingRowCount, double avgRowSize) {
+ this.rowCount = rowCount;
+ this.leadingRowCount = leadingRowCount;
+ this.avgRowSize = avgRowSize;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRDBStatisticsPayload{" +
+ "rowCount=" + rowCount +
+ ", leadingRowCount=" + leadingRowCount +
+ ", avgRowSize=" + avgRowSize +
+ '}';
+ }
+
+ @Override
+ public double getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public double getLeadingRowCount() {
+ return leadingRowCount;
+ }
+
+ @Override
+ public double getAvgRowSize() {
+ return avgRowSize;
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
new file mode 100644
index 0000000..4239c5d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr;
+
+import static org.ojai.DocumentConstants.ID_KEY;
+import org.apache.drill.common.expression.SchemaPath;
+import com.mapr.db.DBConstants;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.PluginCost.CheckValid;
+
+public class PluginConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PluginConstants.class);
+
+ public final static CheckValid alwaysValid = new CheckValid<Integer>() {
+ @Override
+ public boolean isValid(Integer parameter) {
+ return true;
+ }
+ };
+
+ public final static CheckValid isNonNegative = new CheckValid<Integer>() {
+ @Override
+ public boolean isValid(Integer paramValue) {
+ if (paramValue > 0 && paramValue <= Integer.MAX_VALUE) {
+ return true;
+ } else {
+ logger.warn("Setting default value as the supplied parameter value is less than/equals to 0");
+ return false;
+ }
+ }
+ };
+
+ public static final String SSD = "SSD";
+ public static final String HDD = "HDD";
+ public static final SchemaPath ID_SCHEMA_PATH = SchemaPath.getSimplePath(ID_KEY);
+
+ public static final SchemaPath DOCUMENT_SCHEMA_PATH = SchemaPath.getSimplePath(DBConstants.DOCUMENT_FIELD);
+
+ public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
+
+ public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
+ public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
+
+ public static final String JSON_TABLE_RESTRICTED_SCAN_SIZE_MB = "format-maprdb.json.restrictedScanSizeMB";
+ public static final int JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT = 4096;
+
+ public static final String JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING = "format-maprdb.json.useNumRegionsForDistribution";
+ public static final boolean JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING_DEFAULT = false;
+
+ public static final String JSON_TABLE_BLOCK_SIZE = "format-maprdb.json.pluginCost.blockSize";
+ public static final int JSON_TABLE_BLOCK_SIZE_DEFAULT = 8192;
+
+ public static final String JSON_TABLE_MEDIA_TYPE = "format-maprdb.json.mediaType";
+ public static final String JSON_TABLE_MEDIA_TYPE_DEFAULT = SSD;
+
+ public static final String JSON_TABLE_SSD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.ssdBlockSequentialReadCost";
+ public static final int JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * JSON_TABLE_BLOCK_SIZE_DEFAULT;
+
+ // for SSD random and sequential costs are the same
+ public static final String JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.ssdBlockRandomReadCost";
+ public static final int JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+
+ public static final String JSON_TABLE_AVERGE_COLUMN_SIZE = "format-maprdb.json.pluginCost.averageColumnSize";
+ public static final int JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+
+ public static final int TABLE_BLOCK_SIZE_DEFAULT = 8192;
+ public static final int TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * TABLE_BLOCK_SIZE_DEFAULT;
+ public static final int TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+ public static final int TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+ public static final String JSON_TABLE_HDD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.hddBlockSequentialReadCost";
+ public static final int JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT = 6 * JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+
+ public static final String JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.hddBlockRandomReadCost";
+ public static final int JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT = 1000 * JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT;
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
new file mode 100644
index 0000000..d106d6e
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.slf4j.Logger;
+
+public final class PluginErrorHandler {
+
+ public static UserException unsupportedError(Logger logger, String format, Object... args) {
+ return UserException.unsupportedError()
+ .message(String.format(format, args))
+ .build(logger);
+ }
+
+ public static UserException dataReadError(Logger logger, Throwable t) {
+ return dataReadError(logger, t, null);
+ }
+
+ public static UserException dataReadError(Logger logger, String format, Object... args) {
+ return dataReadError(null, format, args);
+ }
+
+ public static UserException dataReadError(Logger logger, Throwable t, String format, Object... args) {
+ return UserException.dataReadError(t)
+ .message(format == null ? null : String.format(format, args))
+ .build(logger);
+ }
+
+ public static SchemaChangeException schemaChangeException(Logger logger, Throwable t, String format, Object... args) {
+ return new SchemaChangeException(format, t, args);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBCost.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBCost.java
new file mode 100644
index 0000000..0c40a02
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBCost.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.store.mapr.PluginConstants;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+
+public class MapRDBCost implements PluginCost {
+
+ private int JSON_AVG_COLUMN_SIZE;
+ private int JSON_TABLE_BLOCK_SIZE; // bytes per block
+ private int JSON_BLOCK_SEQ_READ_COST;
+ private int JSON_BLOCK_RANDOM_READ_COST;
+ private int JSON_HDD_BLOCK_SEQ_READ_COST;
+ private int JSON_HDD_BLOCK_RANDOM_READ_COST;
+ private int JSON_SSD_BLOCK_SEQ_READ_COST;
+ private int JSON_SSD_BLOCK_RANDOM_READ_COST;
+
+ public MapRDBCost(DrillConfig config, String mediaType) {
+ JSON_AVG_COLUMN_SIZE = setConfigValue(config, PluginConstants.JSON_TABLE_AVERGE_COLUMN_SIZE,
+ PluginConstants.JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT, PluginConstants.alwaysValid);
+ JSON_TABLE_BLOCK_SIZE = setConfigValue(config, PluginConstants.JSON_TABLE_BLOCK_SIZE,
+ PluginConstants.JSON_TABLE_BLOCK_SIZE_DEFAULT, PluginConstants.alwaysValid);
+ JSON_SSD_BLOCK_SEQ_READ_COST = setConfigValue(config, PluginConstants.JSON_TABLE_SSD_BLOCK_SEQ_READ_COST,
+ PluginConstants.JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT, PluginConstants.isNonNegative);
+ JSON_SSD_BLOCK_RANDOM_READ_COST = setConfigValue(config, PluginConstants.JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST,
+ PluginConstants.JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT, new greaterThanEquals(JSON_SSD_BLOCK_SEQ_READ_COST));
+ JSON_HDD_BLOCK_SEQ_READ_COST = setConfigValue(config, PluginConstants.JSON_TABLE_HDD_BLOCK_SEQ_READ_COST,
+ PluginConstants.JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT, PluginConstants.isNonNegative);
+ JSON_HDD_BLOCK_RANDOM_READ_COST = setConfigValue(config, PluginConstants.JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST,
+ PluginConstants.JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT, new greaterThanEquals(JSON_HDD_BLOCK_SEQ_READ_COST));
+ JSON_BLOCK_SEQ_READ_COST = mediaType.equals(PluginConstants.SSD) ? JSON_SSD_BLOCK_SEQ_READ_COST :
+ JSON_HDD_BLOCK_SEQ_READ_COST;
+ JSON_BLOCK_RANDOM_READ_COST = mediaType.equals(PluginConstants.SSD) ? JSON_SSD_BLOCK_RANDOM_READ_COST :
+ JSON_HDD_BLOCK_RANDOM_READ_COST;
+ }
+
+ private int setConfigValue(DrillConfig config, String configPath,
+ int defaultValue, CheckValid check) {
+ int configValue;
+ try {
+ configValue = config.getInt(configPath);
+ if (!check.isValid(configValue)) { configValue = defaultValue; }
+ } catch (Exception ex) {
+ // Use defaults, if config values not present or any other issue
+ configValue = defaultValue;
+ }
+ return configValue;
+ }
+
+ @Override
+ public int getAverageColumnSize(GroupScan scan) {
+ if (scan instanceof JsonTableGroupScan) {
+ return JSON_AVG_COLUMN_SIZE;
+ } else {
+ return PluginConstants.TABLE_AVERGE_COLUMN_SIZE_DEFAULT;
+ }
+ }
+
+ @Override
+ public int getBlockSize(GroupScan scan) {
+ if (scan instanceof JsonTableGroupScan) {
+ return JSON_TABLE_BLOCK_SIZE;
+ } else {
+ return PluginConstants.TABLE_BLOCK_SIZE_DEFAULT;
+ }
+ }
+
+ @Override
+ public int getSequentialBlockReadCost(GroupScan scan) {
+ if (scan instanceof JsonTableGroupScan) {
+ return JSON_BLOCK_SEQ_READ_COST;
+ } else {
+ return PluginConstants.TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+ }
+ }
+
+ @Override
+ public int getRandomBlockReadCost(GroupScan scan) {
+ if (scan instanceof JsonTableGroupScan) {
+ return JSON_BLOCK_RANDOM_READ_COST;
+ } else {
+ return PluginConstants.TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT;
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index d4978b9..ee35a68 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
+
import org.apache.drill.exec.store.mapr.TableFormatMatcher;
import org.apache.drill.exec.store.mapr.TableFormatPlugin;
@@ -44,8 +45,7 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
protected boolean isSupportedTable(MapRFileStatus status) throws IOException {
return !getFormatPlugin()
.getMaprFS()
- .getTableProperties(status.getPath())
- .getAttr()
+ .getTableBasicAttrs(status.getPath())
.getIsMarlinTable();
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index fb3ee4d..da4829f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.mapr.PluginConstants;
import org.apache.drill.exec.store.mapr.TableFormatPlugin;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import com.mapr.db.index.IndexDesc;
import com.mapr.fs.tables.TableProperties;
public class MapRDBFormatPlugin extends TableFormatPlugin {
@@ -49,6 +51,11 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
private final MapRDBFormatMatcher matcher;
private final Configuration hbaseConf;
private final Connection connection;
+ private final MapRDBTableCache jsonTableCache;
+ private final int scanRangeSizeMB;
+ private final String mediaType;
+ private final MapRDBCost pluginCostModel;
+ private final int restrictedScanRangeSizeMB;
public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) throws IOException {
@@ -57,6 +64,29 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
hbaseConf = HBaseConfiguration.create(fsConf);
hbaseConf.set(ConnectionFactory.DEFAULT_DB, ConnectionFactory.MAPR_ENGINE2);
connection = ConnectionFactory.createConnection(hbaseConf);
+ jsonTableCache = new MapRDBTableCache(context.getConfig());
+ int scanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB);
+ if (scanRangeSizeMBConfig < 32 || scanRangeSizeMBConfig > 8192) {
+ logger.warn("Invalid scan size {} for MapR-DB tables, using default", scanRangeSizeMBConfig);
+ scanRangeSizeMBConfig = PluginConstants.JSON_TABLE_SCAN_SIZE_MB_DEFAULT;
+ }
+
+ int restrictedScanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB);
+ if (restrictedScanRangeSizeMBConfig < 32 || restrictedScanRangeSizeMBConfig > 8192) {
+ logger.warn("Invalid restricted scan size {} for MapR-DB tables, using default", restrictedScanRangeSizeMBConfig);
+ restrictedScanRangeSizeMBConfig = PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT;
+ }
+
+ String mediaTypeConfig = context.getConfig().getString(PluginConstants.JSON_TABLE_MEDIA_TYPE);
+ if (!(mediaTypeConfig.equals(PluginConstants.SSD) ||
+ mediaTypeConfig.equals(PluginConstants.HDD))) {
+ logger.warn("Invalid media Type {} for MapR-DB JSON tables, using default 'SSD'", mediaTypeConfig);
+ mediaTypeConfig = PluginConstants.JSON_TABLE_MEDIA_TYPE_DEFAULT;
+ }
+ mediaType = mediaTypeConfig;
+ scanRangeSizeMB = scanRangeSizeMBConfig;
+ restrictedScanRangeSizeMB = restrictedScanRangeSizeMBConfig;
+ pluginCostModel = new MapRDBCost(context.getConfig(), mediaType);
}
@Override
@@ -65,19 +95,30 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
}
@Override
+ public MapRDBFormatPluginConfig getConfig() {
+ return (MapRDBFormatPluginConfig)(super.getConfig());
+ }
+
+ public MapRDBTableCache getJsonTableCache() {
+ return jsonTableCache;
+ }
+
+ @Override
@JsonIgnore
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
- return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT);
+ return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT,
+ MapRDBPushProjectIntoScan.PROJECT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_PROJECT,
+ MapRDBPushLimitIntoScan.LIMIT_ON_SCAN);
}
- @Override
+
public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
- List<SchemaPath> columns) throws IOException {
+ List<SchemaPath> columns, IndexDesc indexDesc) throws IOException {
String tableName = getTableName(selection);
TableProperties props = getMaprFS().getTableProperties(new Path(tableName));
if (props.getAttr().getJson()) {
- JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/);
+ JsonScanSpec scanSpec = new JsonScanSpec(tableName, indexDesc, null/*condition*/);
return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
} else {
HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
@@ -85,6 +126,12 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
}
}
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns) throws IOException {
+ return getGroupScan(userName, selection, columns, null /* indexDesc */);
+ }
+
@JsonIgnore
public Configuration getHBaseConf() {
return hbaseConf;
@@ -95,6 +142,18 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
return connection;
}
+ public int getScanRangeSizeMB() {
+ return scanRangeSizeMB;
+ }
+
+ public int getRestrictedScanRangeSizeMB() {
+ return restrictedScanRangeSizeMB;
+ }
+
+ public MapRDBCost getPluginCostModel() {
+ return pluginCostModel;
+ }
+
/**
* Allows to get a table name from FileSelection object
*
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
index ad153fe..07943f6 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -38,6 +38,7 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
* is not regressing performance of reading maprdb table.
*/
public boolean nonExistentFieldSupport = true;
+ public String index = "";
@Override
public int hashCode() {
@@ -65,6 +66,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
return false;
} else if (nonExistentFieldSupport != other.nonExistentFieldSupport) {
return false;
+ } else if (!index.equals(other.index)) {
+ return false;
}
return true;
}
@@ -91,4 +94,5 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
return ignoreSchemaChange;
}
+ public String getIndex() { return this.index; }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index 9e6a744..1e6bcec 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -34,7 +34,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.index.IndexCollection;
+
+import org.apache.drill.exec.planner.cost.PluginCost;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -46,7 +50,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-public abstract class MapRDBGroupScan extends AbstractGroupScan {
+public abstract class MapRDBGroupScan extends AbstractDbGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
protected AbstractStoragePlugin storagePlugin;
@@ -59,7 +63,9 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
- protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
+ protected NavigableMap<TabletFragmentInfo, String> doNotAccessRegionsToScan;
+
+ protected double costFactor = 1.0;
private boolean filterPushedDown = false;
@@ -80,8 +86,13 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
this.formatPlugin = that.formatPlugin;
this.formatPluginConfig = that.formatPluginConfig;
this.storagePlugin = that.storagePlugin;
- this.regionsToScan = that.regionsToScan;
this.filterPushedDown = that.filterPushedDown;
+ this.costFactor = that.costFactor;
+ /* this is the only place we access the field `doNotAccessRegionsToScan` directly
+ * because we do not want the sub-scan spec for JSON tables to be calculated
+ * during the copy-constructor
+ */
+ this.doNotAccessRegionsToScan = that.doNotAccessRegionsToScan;
}
public MapRDBGroupScan(AbstractStoragePlugin storagePlugin,
@@ -102,8 +113,8 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
endpointMap.put(ep.getAddress(), ep);
}
- Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
- for (String serverName : regionsToScan.values()) {
+ final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+ for (String serverName : getRegionsToScan().values()) {
DrillbitEndpoint ep = endpointMap.get(serverName);
if (ep != null) {
EndpointAffinity affinity = affinityMap.get(ep);
@@ -127,6 +138,7 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
watch.reset();
watch.start();
+ final NavigableMap<TabletFragmentInfo, String> regionsToScan = getRegionsToScan();
final int numSlots = incomingEndpoints.size();
Preconditions.checkArgument(numSlots <= regionsToScan.size(),
String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
@@ -161,7 +173,7 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
hostIndexQueue.add(i);
}
- Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+ Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newLinkedHashSet(regionsToScan.entrySet());
/*
* First, we assign regions which are hosted on region servers running on drillbit endpoints
@@ -175,7 +187,8 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
if (endpointIndexlist != null) {
Integer slotIndex = endpointIndexlist.poll();
List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
- endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
+ MapRDBSubScanSpec subScanSpec = getSubScanSpec(regionEntry.getKey());
+ endpointSlotScanList.add(subScanSpec);
// add to the tail of the slot list, to add more later in round robin fashion
endpointIndexlist.offer(slotIndex);
// this region has been assigned
@@ -202,7 +215,8 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
if (regionsToAssignSet.size() > 0) {
for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
List<MapRDBSubScanSpec> smallestList = minHeap.poll();
- smallestList.add(getSubScanSpec(regionEntry.getKey()));
+ MapRDBSubScanSpec subScanSpec = getSubScanSpec(regionEntry.getKey());
+ smallestList.add(subScanSpec);
if (smallestList.size() < maxPerEndpointSlot) {
minHeap.offer(smallestList);
}
@@ -224,6 +238,10 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
}
}
+ for (Entry<Integer, List<MapRDBSubScanSpec>> endpoint : endpointFragmentMapping.entrySet()) {
+ Collections.sort(endpoint.getValue());
+ }
+
/* no slot should be empty at this point */
assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
"Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
@@ -235,7 +253,7 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
@Override
public int getMaxParallelizationWidth() {
- return regionsToScan.size();
+ return getRegionsToScan().size();
}
@JsonIgnore
@@ -273,11 +291,48 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
this.filterPushedDown = true;
}
+ public String getIndexHint() { return this.formatPluginConfig.getIndex(); }
+
@JsonIgnore
+ @Override
public boolean isFilterPushedDown() {
return filterPushedDown;
}
protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
+ public void setCostFactor(double sel) {
+ this.costFactor = sel;
+ }
+
+ @Override
+ public IndexCollection getSecondaryIndexCollection(RelNode scanRel) {
+ //XXX to implement for complete secondary index framework
+ return null;
+ }
+
+ @JsonIgnore
+ public abstract String getTableName();
+
+ @JsonIgnore
+ public int getRowKeyOrdinal() {
+ return 0;
+ }
+
+ protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
+ return doNotAccessRegionsToScan;
+ }
+
+ protected void resetRegionsToScan() {
+ this.doNotAccessRegionsToScan = null;
+ }
+
+ protected void setRegionsToScan(NavigableMap<TabletFragmentInfo, String> regionsToScan) {
+ this.doNotAccessRegionsToScan = regionsToScan;
+ }
+
+ @Override
+ public PluginCost getPluginCostModel() {
+ return formatPlugin.getPluginCostModel();
+ }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index c233a6b..cf49714 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -137,8 +137,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
return; //no filter pushdown ==> No transformation.
}
- // clone the groupScan with the newScanSpec.
- final JsonTableGroupScan newGroupsScan = groupScan.clone(newScanSpec);
+ final JsonTableGroupScan newGroupsScan = (JsonTableGroupScan) groupScan.clone(newScanSpec);
newGroupsScan.setFilterPushedDown(true);
final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
new file mode 100644
index 0000000..79eec12
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.LimitPrel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+
+public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushLimitIntoScan.class);
+
+ private MapRDBPushLimitIntoScan(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ public static final StoragePluginOptimizerRule LIMIT_ON_SCAN =
+ new MapRDBPushLimitIntoScan(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(ScanPrel.class)),
+ "MapRDBPushLimitIntoScan:Limit_On_Scan") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(1);
+ final LimitPrel limit = call.rel(0);
+ doPushLimitIntoGroupScan(call, limit, null, scan, scan.getGroupScan());
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(1);
+ final LimitPrel limit = call.rel(0);
+ // pushdown only apply limit but not offset,
+ // so if getFetch() return null no need to run this rule.
+ if (scan.getGroupScan().supportsLimitPushdown()
+ && !limit.isPushDown() && limit.getFetch() != null) {
+ if ((scan.getGroupScan() instanceof JsonTableGroupScan
+ && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan()) ) {
+ //|| (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ public static final StoragePluginOptimizerRule LIMIT_ON_PROJECT =
+ new MapRDBPushLimitIntoScan(RelOptHelper.some(LimitPrel.class,
+ RelOptHelper.any(ProjectPrel.class)), "MapRDBPushLimitIntoScan:Limit_On_Project") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ProjectPrel project = call.rel(1);
+ final LimitPrel limit = call.rel(0);
+ RelNode child = project.getInput();
+ final RelNode limitUnderProject = new LimitPrel(child.getCluster(), child.getTraitSet(),
+ child, limit.getOffset(), limit.getFetch());
+ final RelNode newProject = new ProjectPrel(project.getCluster(), project.getTraitSet(),
+ limitUnderProject, project.getProjects(), project.getRowType());
+ if (DrillRelOptUtil.isProjectFlatten(project)) {
+ //Preserve limit above the project since Flatten can produce more rows. Also mark it so we do not fire the rule again.
+ child = newProject;
+ final RelNode limitAboveProject = new LimitPrel(child.getCluster(), child.getTraitSet(),
+ child, limit.getOffset(), limit.getFetch(), true);
+ call.transformTo(limitAboveProject);
+ } else {
+ call.transformTo(newProject);
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LimitPrel limitPrel = call.rel(0);
+ ProjectPrel projectPrel = call.rel(1);
+ // pushdown only apply limit but not offset,
+ // so if getFetch() return null no need to run this rule.
+ // Do not push across Project containing CONVERT_FROMJSON for limit 0 queries. For limit 0 queries, this would
+ // mess up the schema since Convert_FromJson() is different from other regular functions in that it only knows
+ // the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way
+ // to know the output type.
+ if (!limitPrel.isPushDown() && (limitPrel.getFetch() != null)
+ && (!DrillRelOptUtil.isLimit0(limitPrel.getFetch())
+ || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectPrel))) {
+ return true;
+ }
+ return false;
+ }
+ };
+
+ protected void doPushLimitIntoGroupScan(RelOptRuleCall call,
+ LimitPrel limit, final ProjectPrel project, ScanPrel scan, GroupScan groupScan) {
+ try {
+ final GroupScan newGroupScan = getGroupScanWithLimit(groupScan, limit);
+ if (newGroupScan == null) {
+ return;
+ }
+ final ScanPrel newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan,
+ scan.getRowType());
+ final RelNode newChild;
+ if (project != null) {
+ final ProjectPrel newProject = new ProjectPrel(project.getCluster(), project.getTraitSet(),
+ newScan, project.getProjects(), project.getRowType());
+ newChild = newProject;
+ } else {
+ newChild = newScan;
+ }
+ call.transformTo(newChild);
+ logger.debug("pushLimitIntoGroupScan: Converted to a new ScanPrel " + newScan.getGroupScan());
+ } catch (Exception e) {
+ logger.warn("pushLimitIntoGroupScan: Exception while trying limit pushdown!", e);
+ }
+ }
+
+ private GroupScan getGroupScanWithLimit(GroupScan groupScan, LimitPrel limit) {
+ final int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0;
+ final int fetch = Math.max(0, RexLiteral.intValue(limit.getFetch()));
+ // Scan Limit uses conservative approach: use offset 0 and fetch = parent limit offset + parent limit fetch.
+ if (groupScan instanceof JsonTableGroupScan) {
+ JsonTableGroupScan jsonTableGroupScan = (JsonTableGroupScan) groupScan;
+ return (jsonTableGroupScan.clone(jsonTableGroupScan.getScanSpec()).applyLimit(offset + fetch));
+ } else if (groupScan instanceof BinaryTableGroupScan) {
+ BinaryTableGroupScan binaryTableGroupScan = (BinaryTableGroupScan) groupScan;
+ final HBaseScanSpec oldScanSpec = binaryTableGroupScan.getHBaseScanSpec();
+ final HBaseScanSpec newScanSpec = new HBaseScanSpec(oldScanSpec.getTableName(), oldScanSpec.getStartRow(),
+ oldScanSpec.getStopRow(), oldScanSpec.getFilter());
+ return new BinaryTableGroupScan(binaryTableGroupScan.getUserName(), binaryTableGroupScan.getStoragePlugin(),
+ binaryTableGroupScan.getFormatPlugin(), newScanSpec, binaryTableGroupScan.getColumns(),
+ binaryTableGroupScan.getTableStats()).applyLimit(offset + fetch);
+ }
+ return null;
+ }
+}
+
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
new file mode 100644
index 0000000..8d4f549
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+
+import java.util.List;
+
+public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRule {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushProjectIntoScan.class);
+
+ private MapRDBPushProjectIntoScan(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ public static final StoragePluginOptimizerRule PROJECT_ON_SCAN = new MapRDBPushProjectIntoScan(
+ RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushProjIntoScan:Proj_On_Scan") {
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ final ProjectPrel project = (ProjectPrel) call.rel(0);
+ if (!(scan.getGroupScan() instanceof MapRDBGroupScan)) {
+ return;
+ }
+ doPushProjectIntoGroupScan(call, project, scan, (MapRDBGroupScan) scan.getGroupScan());
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
+ BinaryTableGroupScan groupScan = (BinaryTableGroupScan) scan.getGroupScan();
+
+ } else {
+ assert (scan.getGroupScan() instanceof JsonTableGroupScan);
+ JsonTableGroupScan groupScan = (JsonTableGroupScan) scan.getGroupScan();
+
+ doPushProjectIntoGroupScan(call, project, scan, groupScan);
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
+ scan.getGroupScan() instanceof JsonTableGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+ };
+
+ protected void doPushProjectIntoGroupScan(RelOptRuleCall call,
+ ProjectPrel project, ScanPrel scan, MapRDBGroupScan groupScan) {
+ try {
+
+ PrelUtil.ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), project.getProjects());
+ if (columnInfo == null || columnInfo.isStarQuery() //
+ || !groupScan.canPushdownProjects(columnInfo.columns)) {
+ return;
+ }
+ RelTraitSet newTraits = call.getPlanner().emptyTraitSet();
+ // Clear out collation trait
+ for (RelTrait trait : scan.getTraitSet()) {
+ if (!(trait instanceof RelCollation)) {
+ newTraits.plus(trait);
+ }
+ }
+ final ScanPrel newScan = new ScanPrel(scan.getCluster(), newTraits.plus(Prel.DRILL_PHYSICAL),
+ groupScan.clone(columnInfo.columns),
+ columnInfo.createNewRowType(project.getInput().getCluster().getTypeFactory()));
+
+ List<RexNode> newProjects = Lists.newArrayList();
+ for (RexNode n : project.getChildExps()) {
+ newProjects.add(n.accept(columnInfo.getInputRewriter()));
+ }
+
+ final ProjectPrel newProj =
+ new ProjectPrel(project.getCluster(),
+ project.getTraitSet().plus(Prel.DRILL_PHYSICAL),
+ newScan,
+ newProjects,
+ project.getRowType());
+
+ if (ProjectRemoveRule.isTrivial(newProj) &&
+ // the old project did not involve any column renaming
+ sameRowTypeProjectionsFields(project.getRowType(), newScan.getRowType())) {
+ call.transformTo(newScan);
+ } else {
+ call.transformTo(newProj);
+ }
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ private boolean sameRowTypeProjectionsFields(RelDataType oldRowType, RelDataType newRowType) {
+ for (RelDataTypeField oldField : oldRowType.getFieldList()) {
+ String oldProjName = oldField.getName();
+ boolean match = false;
+ for (RelDataTypeField newField : newRowType.getFieldList()) {
+ if (oldProjName.equals(newField.getName())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
new file mode 100644
index 0000000..89ce95d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mapr.db.json.RestrictedJsonRecordReader;
+
+import java.util.List;
+
+public class MapRDBRestrictedScanBatchCreator implements BatchCreator<RestrictedMapRDBSubScan> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBRestrictedScanBatchCreator.class);
+
+ @Override
+ public ScanBatch getBatch(ExecutorFragmentContext context, RestrictedMapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<RecordReader> readers = Lists.newArrayList();
+ for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
+ try {
+ readers.add(new RestrictedJsonRecordReader((RestrictedMapRDBSubScanSpec)scanSpec, subScan.getFormatPlugin(), subScan.getColumns(),
+ context, subScan.getMaxRecordsToRead()));
+ } catch (Exception e1) {
+ throw new ExecutionSetupException(e1);
+ }
+ }
+ return new ScanBatch(subScan, context, readers, true);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index 7e4e244..de2817e 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -33,7 +33,9 @@ import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
+public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
+
@Override
public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
@@ -46,7 +48,7 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
getHBaseSubScanSpec(scanSpec),
subScan.getColumns()));
} else {
- readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
+ readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPlugin(), subScan.getColumns(), context, subScan.getMaxRecordsToRead()));
}
} catch (Exception e) {
throw new ExecutionSetupException(e);
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
index 5766b9b..159b850 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.AbstractDbSubScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
@@ -40,12 +40,14 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
// Class containing information for reading a single HBase region
@JsonTypeName("maprdb-sub-scan")
-public class MapRDBSubScan extends AbstractBase implements SubScan {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
+
+public class MapRDBSubScan extends AbstractDbSubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
private final MapRDBFormatPlugin formatPlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
+ private final int maxRecordsToRead;
private final String tableType;
@JsonCreator
@@ -55,20 +57,28 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
@JsonProperty("storageConfig") StoragePluginConfig storageConfig,
@JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
@JsonProperty("tableType") String tableType) throws ExecutionSetupException {
this(userName,
(MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
regionScanSpecList,
columns,
+ maxRecordsToRead,
tableType);
}
public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
+ this(userName, formatPlugin, maprSubScanSpecs, columns, -1, tableType);
+ }
+
+ public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
+ List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, int maxRecordsToRead, String tableType) {
super(userName);
this.formatPlugin = formatPlugin;
this.regionScanSpecList = maprSubScanSpecs;
this.columns = columns;
+ this.maxRecordsToRead = maxRecordsToRead;
this.tableType = tableType;
}
@@ -93,6 +103,11 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
return columns;
}
+ @JsonProperty("maxRecordsToRead")
+ public int getMaxRecordsToRead() {
+ return maxRecordsToRead;
+ }
+
@JsonProperty("tableType")
public String getTableType() {
return tableType;
@@ -121,7 +136,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
@Override
public int getOperatorType() {
- return CoreOperatorType.MAPRDB_SUB_SCAN_VALUE;
+ return CoreOperatorType.MAPRDB_SUB_SCAN_VALUE;
}
@JsonIgnore
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
index 40ab4f4..e24438e 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
@@ -19,32 +19,39 @@ package org.apache.drill.exec.store.mapr.db;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mapr.db.index.IndexDesc;
import com.mapr.fs.jni.MapRConstants;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
-public class MapRDBSubScanSpec {
+public class MapRDBSubScanSpec implements Comparable<MapRDBSubScanSpec>{
protected String tableName;
+ protected IndexDesc indexDesc;
protected String regionServer;
protected byte[] startRow;
protected byte[] stopRow;
protected byte[] serializedFilter;
+ protected String userName;
@JsonCreator
public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("indexDesc") IndexDesc indexDesc,
@JsonProperty("regionServer") String regionServer,
@JsonProperty("startRow") byte[] startRow,
@JsonProperty("stopRow") byte[] stopRow,
@JsonProperty("serializedFilter") byte[] serializedFilter,
- @JsonProperty("filterString") String filterString) {
+ @JsonProperty("filterString") String filterString,
+ @JsonProperty("username") String userName) {
if (serializedFilter != null && filterString != null) {
throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
}
this.tableName = tableName;
+ this.indexDesc = indexDesc;
this.regionServer = regionServer;
this.startRow = startRow;
this.stopRow = stopRow;
this.serializedFilter = serializedFilter;
+ this.userName = userName;
}
/* package */ MapRDBSubScanSpec() {
@@ -55,6 +62,10 @@ public class MapRDBSubScanSpec {
return tableName;
}
+ public IndexDesc getIndexDesc() {
+ return indexDesc;
+ }
+
public MapRDBSubScanSpec setTableName(String tableName) {
this.tableName = tableName;
return this;
@@ -102,13 +113,50 @@ public class MapRDBSubScanSpec {
return this;
}
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
@Override
public String toString() {
return "MapRDBSubScanSpec [tableName=" + tableName
+ ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
+ ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
+ ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter()))
- + ", regionServer=" + regionServer + "]";
+ + ", regionServer=" + regionServer
+ + ", userName=" + userName + "]";
}
+ @Override
+ /*
+ * The semantics of the compareTo function is same as that of TabletInfoImpl.
+ * It compares the startRows of the two subScanSpec and returns the status
+ * if one is greater than the other. If the two startRows are same then it
+ * compares the stopRows.
+ */
+ public int compareTo(MapRDBSubScanSpec o) {
+ if (o == null) {
+ return 1;
+ } else {
+ int result = Bytes.compareTo(this.getStartRow(), o.getStartRow());
+ if (result != 0) {
+ return result;
+ } else {
+ result = Bytes.compareTo(this.getStopRow(), o.getStopRow());
+ if (result != 0) {
+ if (this.getStartRow().length != 0 && this.getStopRow().length == 0) {
+ return 1;
+ } else {
+ return o.getStartRow().length != 0 && o.getStopRow().length == 0 ? -1 : result;
+ }
+ } else {
+ return 0;
+ }
+ }
+ }
+ }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableCache.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableCache.java
new file mode 100644
index 0000000..f35a4c4
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableCache.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.mapr.db.Table;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.index.IndexDesc;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+public class MapRDBTableCache {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class);
+
+ public static final String FORMAT_MAPRDB_JSON_TABLE_CACHE_ENABLED = "format-maprdb.json.tableCache.enabled";
+
+ public static final String FORMAT_MAPRDB_JSON_TABLE_CACHE_SIZE = "format-maprdb.json.tableCache.size";
+
+ public static final String FORMAT_MAPRDB_JSON_TABLE_CACHE_TIMEOUT = "format-maprdb.json.tableCache.expireTimeInMinutes";
+
+ private static final int MIN_TABLE_CACHE_SIZE = 1;
+
+ private static final int MIN_TABLE_CACHE_ENTRY_TIMEOUT = 10;
+
+ LoadingCache<MapRDBTableCache.Key, Table> tableCache;
+
+ private final boolean tableCachingEnabled;
+
+ public MapRDBTableCache(DrillConfig config) {
+ tableCachingEnabled = config.getBoolean(FORMAT_MAPRDB_JSON_TABLE_CACHE_ENABLED);
+ if (tableCachingEnabled) {
+ final int tableCacheSize = Math.max((int) (config.getDouble(FORMAT_MAPRDB_JSON_TABLE_CACHE_SIZE)), MIN_TABLE_CACHE_SIZE);
+ final int tableCacheExpiryTime = Math.max((int) (config.getDouble(FORMAT_MAPRDB_JSON_TABLE_CACHE_TIMEOUT)), MIN_TABLE_CACHE_ENTRY_TIMEOUT);
+
+ RemovalListener<MapRDBTableCache.Key, Table> removalListener = new RemovalListener<MapRDBTableCache.Key, Table>() {
+ public void onRemoval(RemovalNotification<MapRDBTableCache.Key, Table> removal) {
+ Table table = removal.getValue();
+ MapRDBTableCache.Key key = removal.getKey();
+ logger.debug("time {} closing the tablePath {} tableHandle {} index {} userName {}",
+ System.nanoTime(),
+ key.path == null ? "null" : key.path, table == null ? "null" : table,
+ key.indexDesc == null ? "null" : key.indexDesc.getIndexName(),
+ key.ugi.getUserName() == null ? "null" : key.ugi.getUserName());
+ table.close(); // close the table
+ }
+ };
+
+ // Common table cache for primary and index tables. Key is Pair<tablePath, indexDesc>
+ // For primary table, indexDesc is null.
+ tableCache = CacheBuilder.newBuilder().
+ expireAfterAccess(tableCacheExpiryTime, TimeUnit.MINUTES).
+ maximumSize(tableCacheSize).
+ removalListener(removalListener).build(new CacheLoader<MapRDBTableCache.Key, Table>() {
+
+ @Override
+ public Table load(final MapRDBTableCache.Key key) throws Exception {
+ // getTable is already calling tableCache.get in correct user UGI context, so should be fine here.
+ // key.Left is Path. key.Right is indexDesc.
+ Table table = (key.indexDesc == null ? MapRDBImpl.getTable(key.path) : MapRDBImpl.getIndexTable(key.indexDesc));
+ logger.debug("time {} opened the table for tablePath {} tableHandle {} index {} userName {}",
+ System.nanoTime(),
+ key.path == null ? "null" : key.path,
+ table == null ? "null" : table,
+ key.indexDesc == null ? "null" : key.indexDesc.getIndexName(),
+ key.ugi.getUserName() == null ? "null" : key.ugi.getUserName());
+ return table;
+ }
+ });
+
+ logger.debug("table cache created with size {} and expiryTimeInMin {} ", tableCacheSize, tableCacheExpiryTime);
+ }
+ }
+
+
+ /**
+ * getTable given primary table path and indexDesc.
+ * returns Table for corresponding index table if indexDesc is not null.
+ * returns Table for primary table if indexDesc is null.
+ *
+ * @param tablePath primary table path
+ * @param indexDesc index table descriptor
+ */
+ public Table getTable(final Path tablePath, final IndexDesc indexDesc, final String userName) throws DrillRuntimeException {
+
+ final Table dbTableHandle;
+ final UserGroupInformation proxyUserUgi = ImpersonationUtil.createProxyUgi(userName);
+
+ try {
+ dbTableHandle = proxyUserUgi.doAs(new PrivilegedExceptionAction<Table>() {
+ public Table run() throws Exception {
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Getting MaprDB Table handle for proxy user: " + UserGroupInformation.getCurrentUser());
+ }
+
+ if (tableCachingEnabled) {
+ Table table = tableCache.get(new MapRDBTableCache.Key(tablePath, indexDesc));
+ logger.trace("time {} get the tablePath {} tableHandle {} index {} userName {} currentUser {}",
+ System.nanoTime(), tablePath == null ? "null" : tablePath,
+ table == null ? "null" : table,
+ indexDesc == null ? "null" : indexDesc.getIndexName(),
+ userName == null ? "null" : userName,
+ UserGroupInformation.getCurrentUser() == null ? "null" : UserGroupInformation.getCurrentUser());
+ return table;
+ } else {
+ return indexDesc == null ? MapRDBImpl.getTable(tablePath) : MapRDBImpl.getIndexTable(indexDesc);
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error getting table: " + tablePath.toString() + (indexDesc == null ? "" : (", " +
+ "IndexDesc: " + indexDesc.toString())), e);
+ }
+
+ return dbTableHandle;
+ }
+
+ /**
+ * getTable given primary table name.
+ * returns Table for primary table with given name.
+ *
+ * @param tableName primary table path
+ */
+ public Table getTable(String tableName, String userName) {
+ return getTable(new Path(tableName), null, userName);
+ }
+
+ /**
+ * getTable given primary table path.
+ * returns Table for primary table with given path.
+ *
+ * @param tablePath primary table path
+ */
+ public Table getTable(Path tablePath, String userName) {
+ return getTable(tablePath, null, userName);
+ }
+
+ /**
+ * getTable given primary table name and indexDesc.
+ * returns Table for corresponding index table if indexDesc is not null.
+ * returns Table for primary table if indexDesc is null.
+ *
+ * @param tableName primary table name
+ * @param indexDesc index table Descriptor
+ */
+ public Table getTable(String tableName, IndexDesc indexDesc, String userName) {
+ return getTable(new Path(tableName), indexDesc, userName);
+ }
+
+ /**
+ * closeTable
+ *
+ * @param table table to be closed.
+ */
+ public void closeTable(Table table) {
+ if (!tableCachingEnabled && table != null) {
+ table.close();
+ }
+ }
+
+ /**
+ * Key for {@link MapRDBTableCache} to store table path, {@link IndexDesc} and UGI.
+ */
+ static class Key {
+ final Path path;
+
+ final IndexDesc indexDesc;
+
+ final UserGroupInformation ugi;
+
+ Key(Path path, IndexDesc indexDesc) throws IOException {
+ this.path = path;
+ this.indexDesc = indexDesc;
+ this.ugi = UserGroupInformation.getCurrentUser();
+ }
+
+ public int hashCode() {
+
+ final int IdxDescHashCode = (indexDesc == null) ? 0 : indexDesc.getIndexFid().hashCode();
+ return (path.hashCode() + IdxDescHashCode + ugi.hashCode());
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == b || a != null && a.equals(b);
+ }
+
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj != null && obj instanceof MapRDBTableCache.Key) {
+ MapRDBTableCache.Key that = (MapRDBTableCache.Key) obj;
+ return isEqual(this.path, that.path)
+ && isEqual(this.indexDesc, that.indexDesc)
+ && isEqual(this.ugi, that.ugi);
+ } else {
+ return false;
+ }
+ }
+
+ public String toString() {
+ return "(Path: " + this.path.toString() +
+ ", UGI: " + this.ugi.toString() +
+ ", IndexDesc: " + (this.indexDesc == null ? "" : this.indexDesc.toString()) + ")";
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
new file mode 100644
index 0000000..eedbca5
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+/**
+ * A RestrictedMapRDBSubScan is intended for skip-scan (as opposed to sequential scan) operations
+ * where the set of rowkeys is obtained from a corresponding RowKeyJoin instance
+*/
+@JsonTypeName("maprdb-restricted-subscan")
+public class RestrictedMapRDBSubScan extends MapRDBSubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RestrictedMapRDBSubScan.class);
+
+ @JsonCreator
+ public RestrictedMapRDBSubScan(@JacksonInject StoragePluginRegistry engineRegistry,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
+ @JsonProperty("storageConfig") StoragePluginConfig storageConfig,
+ @JsonProperty("regionScanSpecList") List<RestrictedMapRDBSubScanSpec> regionScanSpecList,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
+ @JsonProperty("tableType") String tableType) throws ExecutionSetupException {
+ this(userName,
+ (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
+ regionScanSpecList, columns, maxRecordsToRead, tableType);
+ }
+
+ public RestrictedMapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
+ List<RestrictedMapRDBSubScanSpec> maprDbSubScanSpecs, List<SchemaPath> columns, int maxRecordsToRead, String tableType) {
+ super(userName, formatPlugin, new ArrayList<MapRDBSubScanSpec>(), columns, maxRecordsToRead, tableType);
+
+ for(RestrictedMapRDBSubScanSpec restrictedSpec : maprDbSubScanSpecs) {
+ getRegionScanSpecList().add(restrictedSpec);
+ }
+
+ }
+
+ @Override
+ public void addJoinForRestrictedSubScan(RowKeyJoin rjbatch) {
+ // currently, all subscan specs are sharing the same join batch instance
+ for (MapRDBSubScanSpec s : getRegionScanSpecList()) {
+ assert (s instanceof RestrictedMapRDBSubScanSpec);
+ ((RestrictedMapRDBSubScanSpec)s).setJoinForSubScan(rjbatch);
+ }
+ }
+
+ @Override
+ public boolean isRestrictedSubScan() {
+ return true;
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
new file mode 100644
index 0000000..bd8a32a
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import com.mapr.db.impl.IdCodec;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+import org.apache.drill.exec.record.AbstractRecordBatch.BatchState;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A RestrictedMapRDBSubScanSpec encapsulates a join instance which contains the ValueVectors of row keys and
+ * is associated with this sub-scan and also exposes an iterator type interface over the row key vectors.
+ */
+public class RestrictedMapRDBSubScanSpec extends MapRDBSubScanSpec {
+
+ /**
+ * The RowKeyJoin instance (specific to one minor fragment) which will supply this
+ * subscan with the set of rowkeys. For efficiency, we keep a reference to this
+ * join rather than making another copy of the rowkeys.
+ */
+ private RowKeyJoin rjbatch = null;
+
+ /**
+ * The following are needed to maintain internal state of iteration over the set
+ * of row keys
+ */
+ private ValueVector rowKeyVector = null; // the current row key value vector
+ private int currentIndex = 0; // the current index within the row key vector
+ private int maxOccupiedIndex = -1; // max occupied index within a row key vector
+
+ public RestrictedMapRDBSubScanSpec(String tableName, String regionServer, byte[] serializedFilter, String userName) {
+ super(tableName, null, regionServer, null, null, serializedFilter, null, userName);
+ }
+ /* package */ RestrictedMapRDBSubScanSpec() {
+ // empty constructor, to be used with builder pattern;
+ }
+
+ public void setJoinForSubScan(RowKeyJoin rjbatch) {
+ this.rjbatch = rjbatch;
+ }
+
+ @JsonIgnore
+ public RowKeyJoin getJoinForSubScan() {
+ return rjbatch;
+ }
+
+ @JsonIgnore
+ private void init(Pair<ValueVector, Integer> b) {
+ this.maxOccupiedIndex = b.getRight();
+ this.rowKeyVector = b.getLeft();
+ this.currentIndex = 0;
+ }
+
+ /**
+ * Return {@code true} if a valid rowkey batch is available, {@code false} otherwise
+ */
+ @JsonIgnore
+ public boolean readyToGetRowKey() {
+ return rjbatch != null && rjbatch.hasRowKeyBatch();
+ }
+
+ /**
+ * Return {@code true} if the row key join is in the build schema phase
+ */
+ @JsonIgnore
+ public boolean isBuildSchemaPhase() {
+ return rjbatch.getBatchState() == BatchState.BUILD_SCHEMA;
+ }
+
+ /**
+ * Returns {@code true} if the iteration has more row keys.
+ * (In other words, returns {@code true} if {@link #nextRowKey} would
+ * return a non-null row key)
+ * @return {@code true} if the iteration has more row keys
+ */
+ @JsonIgnore
+ public boolean hasRowKey() {
+ if (rowKeyVector != null && currentIndex <= maxOccupiedIndex) {
+ return true;
+ }
+
+ if (rjbatch != null) {
+ Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
+
+ // note that the hash table could be null initially during the BUILD_SCHEMA phase
+ if (currentBatch != null) {
+ init(currentBatch);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @JsonIgnore
+ public int getMaxRowKeysToBeRead() {
+ if (rjbatch != null) {
+ Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
+
+ // note that the currentBatch could be null initially during the BUILD_SCHEMA phase
+ if (currentBatch != null) {
+ init(currentBatch);
+ }
+ }
+ return maxOccupiedIndex + 1;
+ }
+
+ /**
+ * Returns number of rowKeys that can be read.
+ * Number of rowKeys returned will be numRowKeysToRead at the most i.e. it
+ * will be less than numRowKeysToRead if only that many exist in the currentBatch.
+ */
+ @JsonIgnore
+ public int hasRowKeys(int numRowKeysToRead) {
+ int numKeys = 0;
+
+ // if there is pending rows from the current batch, read them first
+ // in chunks of numRowsToRead rows
+ if (rowKeyVector != null && currentIndex <= maxOccupiedIndex) {
+ numKeys = Math.min(numRowKeysToRead, maxOccupiedIndex - currentIndex + 1);
+ return numKeys;
+ }
+
+ // otherwise, get the next batch of rowkeys
+ if (rjbatch != null) {
+ Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
+
+ // note that the currentBatch could be null initially during the BUILD_SCHEMA phase
+ if (currentBatch != null) {
+ init(currentBatch);
+ numKeys = Math.min(numRowKeysToRead, maxOccupiedIndex - currentIndex + 1);
+ }
+ }
+
+ return numKeys;
+ }
+
+ /**
+ * Returns ids of rowKeys to be read.
+ * Number of rowKey ids returned will be numRowKeysToRead at the most i.e. it
+ * will be less than numRowKeysToRead if only that many exist in the currentBatch.
+ */
+ @JsonIgnore
+ public ByteBuffer[] getRowKeyIdsToRead(int numRowKeysToRead) {
+
+ int numKeys = hasRowKeys(numRowKeysToRead);
+ if (numKeys == 0) return null;
+
+ int index = 0;
+ final ByteBuffer[] rowKeyIds = new ByteBuffer[numKeys];
+
+ while (index < numKeys) {
+ Object o = rowKeyVector.getAccessor().getObject(currentIndex + index);
+ rowKeyIds[index++] = IdCodec.encode(o.toString());
+ }
+
+ updateRowKeysRead(numKeys);
+ return rowKeyIds;
+ }
+
+ /**
+ * updates the index to reflect number of keys read.
+ */
+ @JsonIgnore
+ public void updateRowKeysRead(int numKeys) {
+ currentIndex += numKeys;
+ }
+
+ /**
+ * Returns the next row key in the iteration.
+ * @return the next row key in the iteration or null if no more row keys
+ */
+ @JsonIgnore
+ public String nextRowKey() {
+ if (hasRowKey()) {
+ // get the entry at the current index within this batch
+ Object o = rowKeyVector.getAccessor().getObject(currentIndex++);
+ if (o == null) {
+ throw new DrillRuntimeException("Encountered a null row key during restricted subscan !");
+ }
+
+ // this is specific to the way the hash join maintains its entries. once we have reached the max
+ // occupied index within a batch, move to the next one and reset the current index to 0
+ // TODO: we should try to abstract this out
+ if (currentIndex > maxOccupiedIndex) {
+ Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
+ if (currentBatch != null) {
+ init(currentBatch);
+ }
+ }
+
+ return o.toString();
+ }
+ return null;
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index 086ae21..a135464 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.List;
import java.util.TreeMap;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -31,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.planner.index.Statistics;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
@@ -130,7 +133,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
tableStats = new MapRDBTableStats(getHBaseConf(), hbaseScanSpec.getTableName());
}
boolean foundStartRegion = false;
- regionsToScan = new TreeMap<>();
+ final TreeMap<TabletFragmentInfo, String> regionsToScan = new TreeMap<TabletFragmentInfo, String>();
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
for (HRegionLocation regionLocation : regionLocations) {
HRegionInfo regionInfo = regionLocation.getRegionInfo();
@@ -143,6 +146,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
break;
}
}
+ setRegionsToScan(regionsToScan);
} catch (Exception e) {
throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
}
@@ -154,11 +158,13 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
HBaseScanSpec spec = hbaseScanSpec;
MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec(
spec.getTableName(),
- regionsToScan.get(tfi),
+ null /* indexFid */,
+ getRegionsToScan().get(tfi),
(!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
(!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
spec.getSerializedFilter(),
- null);
+ null,
+ getUserName());
return subScanSpec;
}
@@ -191,6 +197,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
return getFormatPlugin().getHBaseConf();
}
+ @Override
@JsonIgnore
public String getTableName() {
return getHBaseScanSpec().getTableName();
@@ -213,4 +220,25 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
return hbaseScanSpec;
}
+ @Override
+ public void setRowCount(RexNode condition, double count, double capRowCount) {
+ throw new UnsupportedOperationException("setRowCount() not implemented for BinaryTableGroupScan");
+ }
+
+ @Override
+ public double getRowCount(RexNode condition, RelNode scanRel) {
+ return Statistics.ROWCOUNT_UNKNOWN;
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ throw new UnsupportedOperationException("getStatistics() not implemented for BinaryTableGroupScan");
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean isIndexScan() {
+ return false;
+ }
+
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/AllTextValueWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/AllTextValueWriter.java
new file mode 100644
index 0000000..bbb2aec
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/AllTextValueWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapOrListWriter;
+import org.ojai.DocumentReader;
+
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+import io.netty.buffer.DrillBuf;
+
+public class AllTextValueWriter extends OjaiValueWriter {
+
+ public AllTextValueWriter(DrillBuf buffer) {
+ super(buffer);
+ }
+
+ protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, reader.getTimestamp().toUTCString());
+ }
+
+ protected void writeTime(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, reader.getTime().toTimeStr());
+ }
+
+ protected void writeDate(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, reader.getDate().toDateStr());
+ }
+
+ protected void writeDouble(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getDouble()));
+ }
+
+ protected void writeFloat(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getFloat()));
+ }
+
+ protected void writeLong(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getLong()));
+ }
+
+ protected void writeInt(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getInt()));
+ }
+
+ protected void writeShort(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getShort()));
+ }
+
+ protected void writeByte(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getByte()));
+ }
+
+ protected void writeBoolean(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeString(writer, fieldName, String.valueOf(reader.getBoolean()));
+ }
+
+ protected void writeBinary(MapOrListWriter writer, String fieldName, ByteBuffer buf) {
+ writeString(writer, fieldName, Bytes.toString(buf));
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
index 5dab14e..a4cb0bd 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.mapr.db.json;
+import static com.mapr.db.rowcol.DBValueBuilderImpl.KeyValueBuilder;
+
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -41,6 +43,9 @@ import org.ojai.types.OTime;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import com.mapr.db.rowcol.KeyValueBuilder;
+import com.mapr.db.util.SqlHelper;
+
+import org.ojai.types.OTimestamp;
class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
@@ -109,7 +114,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
}
if (valueArg instanceof QuotedString) {
- this.value = KeyValueBuilder.initFrom(((QuotedString) valueArg).value);
+ this.value = SqlHelper.decodeStringAsValue(((QuotedString) valueArg).value);
this.path = path;
return true;
}
@@ -182,11 +187,9 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
}
if (valueArg instanceof TimeStampExpression) {
- // disable pushdown of TimeStampExpression type until bug 22824 is fixed.
- //
- // this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp()));
- // this.path = path;
- // return true;
+ this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp()));
+ this.path = path;
+ return true;
}
return false;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/DocumentReaderVectorWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/DocumentReaderVectorWriter.java
new file mode 100644
index 0000000..1c9dead
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/DocumentReaderVectorWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+/**
+ * Base class for writing a single OJAI Document to Drill's Value Vectors.
+ */
+abstract class DocumentReaderVectorWriter {
+ protected static final Logger logger = LoggerFactory.getLogger(DocumentReaderVectorWriter.class);
+
+ protected final OjaiValueWriter valueWriter;
+
+ protected DocumentReaderVectorWriter(final OjaiValueWriter valueWriter) {
+ this.valueWriter = valueWriter;
+ }
+
+ protected abstract void writeDBDocument(final VectorContainerWriter writer, final DBDocumentReaderBase reader)
+ throws SchemaChangeException;
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldPathHelper.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldPathHelper.java
new file mode 100644
index 0000000..d8d3792
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldPathHelper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.util.Stack;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.ojai.FieldPath;
+import org.ojai.FieldSegment;
+
+public class FieldPathHelper {
+
+ /**
+ * Returns {@link SchemaPath} equivalent of the specified {@link FieldPath}.
+ */
+ public static SchemaPath fieldPath2SchemaPath(FieldPath fieldPath) {
+ Stack<FieldSegment> fieldSegments = new Stack<FieldSegment>();
+ FieldSegment seg = fieldPath.getRootSegment();
+ while (seg != null) {
+ fieldSegments.push(seg);
+ seg = seg.getChild();
+ }
+
+ PathSegment child = null;
+ while (!fieldSegments.isEmpty()) {
+ seg = fieldSegments.pop();
+ if (seg.isNamed()) {
+ child = new PathSegment.NameSegment(((FieldSegment.NameSegment)seg).getName(), child);
+ } else {
+ child = new PathSegment.ArraySegment(((FieldSegment.IndexSegment)seg).getIndex(), child);
+ }
+ }
+ return new SchemaPath((PathSegment.NameSegment)child);
+ }
+
+ /**
+ * Returns {@link FieldPath} equivalent of the specified {@link SchemaPath}.
+ */
+ public static FieldPath schemaPath2FieldPath(SchemaPath column) {
+ Stack<PathSegment> pathSegments = new Stack<PathSegment>();
+ PathSegment seg = column.getRootSegment();
+ while (seg != null) {
+ pathSegments.push(seg);
+ seg = seg.getChild();
+ }
+
+ FieldSegment child = null;
+ while (!pathSegments.isEmpty()) {
+ seg = pathSegments.pop();
+ if (seg.isNamed()) {
+ child = new FieldSegment.NameSegment(((PathSegment.NameSegment)seg).getPath(), child, false);
+ } else {
+ child = new FieldSegment.IndexSegment(((PathSegment.ArraySegment)seg).getIndex(), child);
+ }
+ }
+ return new FieldPath((FieldSegment.NameSegment) child);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldTransferVectorWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldTransferVectorWriter.java
new file mode 100644
index 0000000..67bbcb3
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/FieldTransferVectorWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.dataReadError;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.ojai.DocumentReader.EventType;
+
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+/**
+ * This implementation of DocumentReaderVectorWriter does field by field transfer the OJAI Document
+ * to Drill Value Vectors.
+ */
+class FieldTransferVectorWriter extends DocumentReaderVectorWriter {
+
+ protected FieldTransferVectorWriter(final OjaiValueWriter valueWriter) {
+ super(valueWriter);
+ }
+
+ @Override
+ protected void writeDBDocument(VectorContainerWriter vectorWriter, DBDocumentReaderBase reader)
+ throws SchemaChangeException {
+ MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap());
+ if (reader.next() != EventType.START_MAP) {
+ throw dataReadError(logger, "The document did not start with START_MAP!");
+ }
+ valueWriter.writeToListOrMap(writer, reader);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/IdOnlyVectorWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/IdOnlyVectorWriter.java
new file mode 100644
index 0000000..2c9f762
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/IdOnlyVectorWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.schemaChangeException;
+import static org.ojai.DocumentConstants.ID_KEY;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.ojai.Value;
+
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+/**
+ * This implementation of DocumentReaderVectorWriter writes only the "_id" field from the OJAI
+ * Document to Drill Value Vectors. This is useful for "_id" only queries.
+ */
+class IdOnlyVectorWriter extends DocumentReaderVectorWriter {
+
+ protected IdOnlyVectorWriter(final OjaiValueWriter valueWriter) {
+ super(valueWriter);
+ }
+
+ @Override
+ public void writeDBDocument(VectorContainerWriter vectorWriter, DBDocumentReaderBase reader)
+ throws SchemaChangeException {
+ MapWriter writer = vectorWriter.rootAsMap();
+
+ Value id = reader.getId();
+ try {
+ switch(id.getType()) {
+ case STRING:
+ valueWriter.writeString(writer, ID_KEY, id.getString());
+ break;
+ case BINARY:
+ valueWriter.writeBinary(writer, ID_KEY, id.getBinary());
+ break;
+ default:
+ throw new UnsupportedOperationException(id.getType() +
+ " is not a supported type for _id field.");
+ }
+ } catch (IllegalStateException | IllegalArgumentException e) {
+ throw schemaChangeException(logger, e, "Possible schema change at _id: '%s'", IdCodec.asString(id));
+ }
+
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
index 3b64bf9..ce4e677 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
@@ -22,13 +22,13 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.mapr.db.util.FieldPathHelper;
import org.ojai.Value;
import org.ojai.store.QueryCondition;
import org.ojai.store.QueryCondition.Op;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import com.mapr.db.MapRDB;
+import com.mapr.db.impl.MapRDBImpl;
public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
@@ -88,14 +88,27 @@ public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void
JsonScanSpec nextScanSpec = args.get(i).accept(this, null);
if (nodeScanSpec != null && nextScanSpec != null) {
nodeScanSpec.mergeScanSpec(functionName, nextScanSpec);
- } else {
- allExpressionsConverted = false;
- if ("booleanAnd".equals(functionName)) {
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
nodeScanSpec = nodeScanSpec == null ? nextScanSpec : nodeScanSpec;
}
}
}
break;
+
+ case "ojai_sizeof":
+ case "ojai_typeof":
+ case "ojai_nottypeof":
+ case "ojai_matches":
+ case "ojai_notmatches":
+ case "ojai_condition": {
+ final OjaiFunctionsProcessor processor = OjaiFunctionsProcessor.process(call);
+ if (processor != null) {
+ return new JsonScanSpec(groupScan.getTableName(), groupScan.getIndexDesc(),
+ processor.getCondition());
+ }
+ }
}
}
@@ -159,80 +172,76 @@ public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void
private JsonScanSpec createJsonScanSpec(FunctionCall call,
CompareFunctionsProcessor processor) {
String functionName = processor.getFunctionName();
- String fieldPath = FieldPathHelper.schemaPathToFieldPath(processor.getPath()).asPathString();
+ String fieldPath = FieldPathHelper.schemaPath2FieldPath(processor.getPath()).asPathString();
Value fieldValue = processor.getValue();
QueryCondition cond = null;
switch (functionName) {
case "equal":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.EQUAL, fieldValue);
- cond.build();
break;
case "not_equal":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.NOT_EQUAL, fieldValue);
- cond.build();
break;
case "less_than":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.LESS, fieldValue);
- cond.build();
break;
case "less_than_or_equal_to":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.LESS_OR_EQUAL, fieldValue);
- cond.build();
break;
case "greater_than":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.GREATER, fieldValue);
- cond.build();
break;
case "greater_than_or_equal_to":
- cond = MapRDB.newCondition();
+ cond = MapRDBImpl.newCondition();
setIsCondition(cond, fieldPath, Op.GREATER_OR_EQUAL, fieldValue);
- cond.build();
break;
case "isnull":
- cond = MapRDB.newCondition().notExists(fieldPath).build();
+ cond = MapRDBImpl.newCondition().notExists(fieldPath);
break;
case "isnotnull":
- cond = MapRDB.newCondition().exists(fieldPath).build();
+ cond = MapRDBImpl.newCondition().exists(fieldPath);
break;
case "istrue":
- cond = MapRDB.newCondition().is(fieldPath, Op.EQUAL, true).build();
+ cond = MapRDBImpl.newCondition().is(fieldPath, Op.EQUAL, true);
break;
case "isnotfalse":
- cond = MapRDB.newCondition().is(fieldPath, Op.NOT_EQUAL, false).build();
+ cond = MapRDBImpl.newCondition().is(fieldPath, Op.NOT_EQUAL, false);
break;
case "isfalse":
- cond = MapRDB.newCondition().is(fieldPath, Op.EQUAL, false).build();
+ cond = MapRDBImpl.newCondition().is(fieldPath, Op.EQUAL, false);
break;
case "isnottrue":
- cond = MapRDB.newCondition().is(fieldPath, Op.NOT_EQUAL, true).build();
+ cond = MapRDBImpl.newCondition().is(fieldPath, Op.NOT_EQUAL, true);
break;
case "like":
- cond = MapRDB.newCondition().like(fieldPath, fieldValue.getString()).build();
+ cond = MapRDBImpl.newCondition().like(fieldPath, fieldValue.getString());
break;
default:
}
if (cond != null) {
- return new JsonScanSpec(groupScan.getTableName(), cond);
+ return new JsonScanSpec(groupScan.getTableName(),
+ groupScan.getIndexDesc(),
+ cond.build());
}
return null;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
index 6858394..8d06f17 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
@@ -17,50 +17,78 @@
*/
package org.apache.drill.exec.store.mapr.db.json;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.ojai.store.QueryCondition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.db.MapRDB;
import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.ConditionNode.RowkeyRange;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.index.IndexDesc;
public class JsonScanSpec {
- protected String tableName;
- protected QueryCondition condition;
-
- @JsonCreator
- public JsonScanSpec(@JsonProperty("tableName") String tableName,
- @JsonProperty("condition") QueryCondition condition) {
- this.tableName = tableName;
- this.condition = condition;
+ protected String tableName;
+ protected IndexDesc indexDesc;
+ protected QueryCondition condition;
+ protected byte[] startRow;
+ protected byte[] stopRow;
+
+ @JsonCreator
+ public JsonScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("indexDesc") IndexDesc indexDesc,
+ @JsonProperty("condition") QueryCondition condition) {
+ this.tableName = tableName;
+ this.indexDesc = indexDesc;
+ this.condition = condition;
+ if (this.condition != null) {
+ List<RowkeyRange> rkRanges = ((ConditionImpl)this.condition).getRowkeyRanges();
+ if (rkRanges.size() > 0) {
+ startRow = rkRanges.get(0).getStartRow();
+ stopRow = rkRanges.get(rkRanges.size() - 1).getStopRow();
+ } else {
+ startRow = HConstants.EMPTY_START_ROW;
+ stopRow = HConstants.EMPTY_END_ROW;
+ }
+ }
}
public String getTableName() {
return this.tableName;
}
+ public IndexDesc getIndexDesc() {
+ return this.indexDesc;
+ }
+
+ public void setStartRow(byte []startRow) {
+ this.startRow = startRow;
+ }
+
+ public void setStopRow(byte []stopRow) {
+ this.stopRow = stopRow;
+ }
+
public byte[] getStartRow() {
- if (condition == null) {
- return HConstants.EMPTY_START_ROW;
- }
- return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStartRow();
+ return this.startRow;
}
public byte[] getStopRow() {
- if (condition == null) {
- return HConstants.EMPTY_END_ROW;
- }
-
- return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStopRow();
+ return this.stopRow;
}
- public Object getSerializedFilter() {
+ public byte[] getSerializedFilter() {
if (this.condition != null) {
- return ((ConditionImpl)this.condition).getDescriptor().getSerialized();
+ ByteBuffer bbuf = ((ConditionImpl)this.condition).getDescriptor().getSerialized();
+ byte[] serFilter = new byte[bbuf.limit() - bbuf.position()];
+ bbuf.get(serFilter);
+ return serFilter;
}
-
return null;
}
@@ -73,10 +101,23 @@ public class JsonScanSpec {
return this.condition;
}
- public void mergeScanSpec(String functionName, JsonScanSpec scanSpec) {
+ public boolean isSecondaryIndex() {
+ return (this.indexDesc != null);
+ }
+
+ @JsonIgnore
+ public Path getPrimaryTablePath() {
+ return (this.indexDesc == null) ? null : new Path(this.indexDesc.getPrimaryTablePath());
+ }
+
+ @JsonIgnore
+ public String getIndexName() {
+ return (this.indexDesc == null) ? null : this.indexDesc.getIndexName();
+ }
+ public void mergeScanSpec(String functionName, JsonScanSpec scanSpec) {
if (this.condition != null && scanSpec.getCondition() != null) {
- QueryCondition newCond = MapRDB.newCondition();
+ QueryCondition newCond = MapRDBImpl.newCondition();
switch (functionName) {
case "booleanAnd":
newCond.and();
@@ -84,7 +125,7 @@ public class JsonScanSpec {
case "booleanOr":
newCond.or();
break;
- default:
+ default:
assert(false);
}
@@ -98,11 +139,13 @@ public class JsonScanSpec {
this.condition = scanSpec.getCondition();
}
}
-
+
@Override
public String toString() {
+ String fidInfo = (getIndexDesc() != null)? ", indexName=" + getIndexName() : "";
return "JsonScanSpec [tableName=" + tableName
+ ", condition=" + (condition == null ? null : condition.toString())
+ + fidInfo
+ "]";
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
index 3fe0a3b..1e7eb31 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
@@ -18,75 +18,38 @@
package org.apache.drill.exec.store.mapr.db.json;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
-import org.apache.hadoop.hbase.HConstants;
-import org.ojai.DocumentConstants;
-import org.ojai.Value;
import org.ojai.store.QueryCondition;
-import org.ojai.store.QueryCondition.Op;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.db.MapRDB;
import com.mapr.db.impl.ConditionImpl;
-import com.mapr.db.impl.IdCodec;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.index.IndexDesc;
+/**
+ * This class is a helper extension of {@link MapRDBSubScanSpec} class and does not
+ * get serialized or deserialized.
+ */
public class JsonSubScanSpec extends MapRDBSubScanSpec {
protected QueryCondition condition;
- @JsonCreator
- public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
- @JsonProperty("regionServer") String regionServer,
- @JsonProperty("startRow") byte[] startRow,
- @JsonProperty("stopRow") byte[] stopRow,
- @JsonProperty("cond") QueryCondition cond) {
- super(tableName, regionServer, null, null, null, null);
-
- this.condition = MapRDB.newCondition().and();
-
- if (cond != null) {
- this.condition.condition(cond);
- }
+ public JsonSubScanSpec(String tableName, IndexDesc indexDesc, String regionServer,
+ QueryCondition scanRangeCondition, QueryCondition userCondition,
+ byte[] startRow, byte[] stopRow, String userName) {
+ super(tableName, indexDesc, regionServer, startRow, stopRow, null, null, userName);
- if (startRow != null &&
- Arrays.equals(startRow, HConstants.EMPTY_START_ROW) == false) {
- Value startVal = IdCodec.decode(startRow);
+ condition = MapRDBImpl.newCondition().and();
- switch(startVal.getType()) {
- case BINARY:
- this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getBinary());
- break;
- case STRING:
- this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getString());
- break;
- default:
- throw new IllegalStateException("Encountered an unsupported type " + startVal.getType()
- + " for _id");
- }
+ if (userCondition != null && !userCondition.isEmpty()) {
+ condition.condition(userCondition);
}
-
- if (stopRow != null &&
- Arrays.equals(stopRow, HConstants.EMPTY_END_ROW) == false) {
- Value stopVal = IdCodec.decode(stopRow);
-
- switch(stopVal.getType()) {
- case BINARY:
- this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getBinary());
- break;
- case STRING:
- this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getString());
- break;
- default:
- throw new IllegalStateException("Encountered an unsupported type " + stopVal.getType()
- + " for _id");
- }
+ if (scanRangeCondition != null && !scanRangeCondition.isEmpty()) {
+ condition.condition(scanRangeCondition);
}
- this.condition.close().build();
+ condition.close().build();
}
public void setCondition(QueryCondition cond) {
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index c0274e0..a269256 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -17,29 +17,52 @@
*/
package org.apache.drill.exec.store.mapr.db.json;
-import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty;
+import static org.apache.drill.exec.planner.index.Statistics.ROWCOUNT_HUGE;
+import static org.apache.drill.exec.planner.index.Statistics.ROWCOUNT_UNKNOWN;
+import static org.apache.drill.exec.planner.index.Statistics.AVG_ROWSIZE_UNKNOWN;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
import java.util.TreeMap;
+import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.ConditionNode.RowkeyRange;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.planner.index.Statistics;
+import org.apache.drill.exec.planner.index.MapRDBStatistics;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.PluginConstants;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.util.Utilities;
import org.codehaus.jackson.annotate.JsonCreator;
+import org.ojai.store.QueryCondition;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -47,21 +70,46 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.mapr.db.MapRDB;
+import com.mapr.db.MetaTable;
import com.mapr.db.Table;
-import com.mapr.db.TabletInfo;
import com.mapr.db.impl.TabletInfoImpl;
+import com.mapr.db.index.IndexDesc;
+import com.mapr.db.scan.ScanRange;
@JsonTypeName("maprdb-json-scan")
-public class JsonTableGroupScan extends MapRDBGroupScan {
+public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableGroupScan.class);
+ public static final int STAR_COLS = 100;
public static final String TABLE_JSON = "json";
+ /*
+ * The <forcedRowCountMap> maintains a mapping of <RexNode, Rowcount>. These RowCounts take precedence over
+ * anything computed using <MapRDBStatistics> stats. Currently, it is used for picking index plans with the
+ * index_selectivity_factor. We forcibly set the full table rows as HUGE <Statistics.ROWCOUNT_HUGE> in this
+ * map when the selectivity of the index is lower than index_selectivity_factor. During costing, the table
+ * rowCount is returned as HUGE instead of the correct <stats> rowcount. This results in the planner choosing
+ * the cheaper index plans!
+ * NOTE: Full table rowCounts are specified with the NULL condition. e.g. forcedRowCountMap<NULL, 1000>
+ */
+ protected Map<RexNode, Double> forcedRowCountMap;
+ /*
+ * This stores the statistics associated with this GroupScan. Please note that the stats must be initialized
+ * before using it to compute filter row counts based on query conditions.
+ */
+ protected MapRDBStatistics stats;
+ protected JsonScanSpec scanSpec;
+ protected double fullTableRowCount;
+ protected double fullTableEstimatedSize;
- private long totalRowCount;
- private Table table;
- private TabletInfo[] tabletInfos;
+ /**
+ * need only read maxRecordsToRead records.
+ */
+ protected int maxRecordsToRead = -1;
- private JsonScanSpec scanSpec;
+ /**
+ * Forced parallelization width
+ */
+ protected int parallelizationWidth = -1;
@JsonCreator
public JsonTableGroupScan(@JsonProperty("userName") final String userName,
@@ -80,6 +128,18 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) {
super(storagePlugin, formatPlugin, columns, userName);
this.scanSpec = scanSpec;
+ this.stats = new MapRDBStatistics();
+ this.forcedRowCountMap = new HashMap<>();
+ init();
+ }
+
+ public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+ MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns,
+ MapRDBStatistics stats) {
+ super(storagePlugin, formatPlugin, columns, userName);
+ this.scanSpec = scanSpec;
+ this.stats = stats;
+ this.forcedRowCountMap = new HashMap<>();
init();
}
@@ -87,16 +147,17 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
* Private constructor, used for cloning.
* @param that The HBaseGroupScan to clone
*/
- private JsonTableGroupScan(JsonTableGroupScan that) {
+ protected JsonTableGroupScan(JsonTableGroupScan that) {
super(that);
this.scanSpec = that.scanSpec;
this.endpointFragmentMapping = that.endpointFragmentMapping;
-
- // Reusing the table handle, tabletInfos and totalRowCount saves expensive
- // calls to MapR DB client to get them again.
- this.table = that.table;
- this.tabletInfos = that.tabletInfos;
- this.totalRowCount = that.totalRowCount;
+ this.stats = that.stats;
+ this.fullTableRowCount = that.fullTableRowCount;
+ this.fullTableEstimatedSize = that.fullTableEstimatedSize;
+ this.forcedRowCountMap = that.forcedRowCountMap;
+ this.maxRecordsToRead = that.maxRecordsToRead;
+ this.parallelizationWidth = that.parallelizationWidth;
+ init();
}
@Override
@@ -106,71 +167,122 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
return newScan;
}
- /**
- * Create a new groupScan, which is a clone of this.
- * Initialize scanSpec.
- * We should recompute regionsToScan as it depends upon scanSpec.
- * @param scanSpec
- */
- public JsonTableGroupScan clone(JsonScanSpec scanSpec) {
+ public GroupScan clone(JsonScanSpec scanSpec) {
JsonTableGroupScan newScan = new JsonTableGroupScan(this);
newScan.scanSpec = scanSpec;
- newScan.computeRegionsToScan();
+ newScan.resetRegionsToScan(); // resetting will force recalculation
return newScan;
}
- /**
- * Compute regions to scan based on the scanSpec
- */
- private void computeRegionsToScan() {
- boolean foundStartRegion = false;
-
- regionsToScan = new TreeMap<TabletFragmentInfo, String>();
- for (TabletInfo tabletInfo : tabletInfos) {
- TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
- if (!foundStartRegion && !isNullOrEmpty(scanSpec.getStartRow()) && !tabletInfoImpl.containsRow(scanSpec.getStartRow())) {
- continue;
- }
- foundStartRegion = true;
- regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), tabletInfo.getLocations()[0]);
- if (!isNullOrEmpty(scanSpec.getStopRow()) && tabletInfoImpl.containsRow(scanSpec.getStopRow())) {
- break;
+ private void init() {
+ try {
+ // Get the fullTableRowCount only once i.e. if not already obtained before.
+ if (fullTableRowCount == 0) {
+ final Table t = this.formatPlugin.getJsonTableCache().getTable(
+ scanSpec.getTableName(), scanSpec.getIndexDesc(), getUserName());
+ final MetaTable metaTable = t.getMetaTable();
+ // For condition null, we get full table stats.
+ com.mapr.db.scan.ScanStats stats = metaTable.getScanStats();
+ fullTableRowCount = stats.getEstimatedNumRows();
+ fullTableEstimatedSize = stats.getEstimatedSize();
+ // MapRDB client can return invalid rowCount i.e. 0, especially right after table
+ // creation. It takes 15 minutes before table stats are obtained and cached in client.
+ // If we get 0 rowCount, fallback to getting rowCount using old admin API.
+ if (fullTableRowCount == 0) {
+ PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+ final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+ final int numColumns = (columns == null || columns.isEmpty() || Utilities.isStarQuery(columns)) ? STAR_COLS : columns.size();
+ MapRDBTableStats tableStats = new MapRDBTableStats(formatPlugin.getFsConf(), scanSpec.getTableName());
+ fullTableRowCount = tableStats.getNumRows();
+ fullTableEstimatedSize = fullTableRowCount * numColumns * avgColumnSize;
+ }
}
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error getting region info for table: " +
+ scanSpec.getTableName() + (scanSpec.getIndexDesc() == null ? "" : (", index: " + scanSpec.getIndexName())), e);
}
}
- private void init() {
- logger.debug("Getting tablet locations");
- try {
- Configuration conf = new Configuration();
+ protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
+ return getRegionsToScan(formatPlugin.getScanRangeSizeMB());
+ }
- // Fetch table and tabletInfo only once and cache.
- table = MapRDB.getTable(scanSpec.getTableName());
- tabletInfos = table.getTabletInfos(scanSpec.getCondition());
+ protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan(int scanRangeSizeMB) {
+ // If regionsToScan already computed, just return.
+ double estimatedRowCount = ROWCOUNT_UNKNOWN;
+ if (doNotAccessRegionsToScan == null) {
+ final Table t = this.formatPlugin.getJsonTableCache().getTable(
+ scanSpec.getTableName(), scanSpec.getIndexDesc(), getUserName());
+ final MetaTable metaTable = t.getMetaTable();
- // Calculate totalRowCount for the table from tabletInfos estimatedRowCount.
- // This will avoid calling expensive MapRDBTableStats API to get total rowCount, avoiding
- // duplicate work and RPCs to MapR DB server.
- for (TabletInfo tabletInfo : tabletInfos) {
- totalRowCount += tabletInfo.getEstimatedNumRows();
+ QueryCondition scanSpecCondition = scanSpec.getCondition();
+ List<ScanRange> scanRanges = (scanSpecCondition == null)
+ ? metaTable.getScanRanges(scanRangeSizeMB)
+ : metaTable.getScanRanges(scanSpecCondition, scanRangeSizeMB);
+ logger.debug("getRegionsToScan() with scanSpec {}: table={}, index={}, condition={}, sizeMB={}, #ScanRanges={}",
+ System.identityHashCode(scanSpec), scanSpec.getTableName(), scanSpec.getIndexName(),
+ scanSpec.getCondition() == null ? "null" : scanSpec.getCondition(), scanRangeSizeMB,
+ scanRanges == null ? "null" : scanRanges.size());
+ final TreeMap<TabletFragmentInfo, String> regionsToScan = new TreeMap<>();
+ if (isIndexScan()) {
+ String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(),
+ scanSpec.getIndexDesc().getIndexName());
+ if (stats.isStatsAvailable()) {
+ estimatedRowCount = stats.getRowCount(scanSpec.getCondition(), idxIdentifier);
+ }
+ } else {
+ if (stats.isStatsAvailable()) {
+ estimatedRowCount = stats.getRowCount(scanSpec.getCondition(), null);
+ }
}
+ // If limit pushdown has occurred - factor it in the rowcount
+ if (this.maxRecordsToRead > 0) {
+ estimatedRowCount = Math.min(estimatedRowCount, this.maxRecordsToRead);
+ }
+ // If the estimated row count > 0 then scan ranges must be > 0
+ Preconditions.checkState(estimatedRowCount == ROWCOUNT_UNKNOWN
+ || estimatedRowCount == 0 || (scanRanges != null && scanRanges.size() > 0),
+ String.format("#Scan ranges should be greater than 0 since estimated rowcount=[%f]", estimatedRowCount));
+ if (scanRanges != null && scanRanges.size() > 0) {
+ // set the start-row of the scanspec as the start-row of the first scan range
+ ScanRange firstRange = scanRanges.get(0);
+ QueryCondition firstCondition = firstRange.getCondition();
+ byte[] firstStartRow = ((ConditionImpl) firstCondition).getRowkeyRanges().get(0).getStartRow();
+ scanSpec.setStartRow(firstStartRow);
- computeRegionsToScan();
+ // set the stop-row of ScanSpec as the stop-row of the last scan range
+ ScanRange lastRange = scanRanges.get(scanRanges.size() - 1);
+ QueryCondition lastCondition = lastRange.getCondition();
+ List<RowkeyRange> rowkeyRanges = ((ConditionImpl) lastCondition).getRowkeyRanges();
+ byte[] lastStopRow = rowkeyRanges.get(rowkeyRanges.size() - 1).getStopRow();
+ scanSpec.setStopRow(lastStopRow);
- } catch (Exception e) {
- throw new DrillRuntimeException("Error getting region info for table: " + scanSpec.getTableName(), e);
+ for (ScanRange range : scanRanges) {
+ TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) range;
+ regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), range.getLocations()[0]);
+ }
+ }
+ setRegionsToScan(regionsToScan);
}
+ return doNotAccessRegionsToScan;
}
- protected JsonSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+ protected MapRDBSubScanSpec getSubScanSpec(final TabletFragmentInfo tfi) {
// XXX/TODO check filter/Condition
- JsonScanSpec spec = scanSpec;
+ final JsonScanSpec spec = scanSpec;
+ final String serverHostName = getRegionsToScan().get(tfi);
+ QueryCondition condition = tfi.getTabletInfoImpl().getCondition();
+ byte[] startRow = condition == null ? null : ((ConditionImpl) condition).getRowkeyRanges().get(0).getStartRow();
+ byte[] stopRow = condition == null ? null : ((ConditionImpl) condition).getRowkeyRanges().get(0).getStopRow();
JsonSubScanSpec subScanSpec = new JsonSubScanSpec(
spec.getTableName(),
- regionsToScan.get(tfi),
- (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
- (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
- spec.getCondition());
+ spec.getIndexDesc(),
+ serverHostName,
+ tfi.getTabletInfoImpl().getCondition(),
+ spec.getCondition(),
+ startRow,
+ stopRow,
+ getUserName());
return subScanSpec;
}
@@ -179,16 +291,72 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
- return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
+ return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON);
}
@Override
public ScanStats getScanStats() {
- //TODO: look at stats for this.
- long rowCount = (long) ((scanSpec.getSerializedFilter() != null ? .5 : 1) * totalRowCount);
- int avgColumnSize = 10;
- int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
+ return fullTableScanStats();
+ }
+
+ private ScanStats fullTableScanStats() {
+ PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+ final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+ final int numColumns = (columns == null || columns.isEmpty()) ? STAR_COLS : columns.size();
+ // index will be NULL for FTS
+ double rowCount = stats.getRowCount(scanSpec.getCondition(), null);
+ // rowcount based on _id predicate. If NO _id predicate present in condition, then the
+ // rowcount should be same as totalRowCount. Equality b/w the two rowcounts should not be
+ // construed as NO _id predicate since stats are approximate.
+ double leadingRowCount = stats.getLeadingRowCount(scanSpec.getCondition(), null);
+ double avgRowSize = stats.getAvgRowSize(null, true);
+ double totalRowCount = stats.getRowCount(null, null);
+ logger.debug("GroupScan {} with stats {}: rowCount={}, condition={}, totalRowCount={}, fullTableRowCount={}",
+ System.identityHashCode(this), System.identityHashCode(stats), rowCount,
+ scanSpec.getCondition()==null?"null":scanSpec.getCondition(),
+ totalRowCount, fullTableRowCount);
+ // If UNKNOWN, or DB stats sync issues(manifests as 0 rows) use defaults.
+ if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0) {
+ rowCount = (scanSpec.getSerializedFilter() != null ? .5 : 1) * fullTableRowCount;
+ }
+ // If limit pushdown has occurred - factor it in the rowcount
+ if (this.maxRecordsToRead > 0) {
+ rowCount = Math.min(rowCount, this.maxRecordsToRead);
+ }
+ if (totalRowCount == ROWCOUNT_UNKNOWN || totalRowCount == 0) {
+ logger.debug("did not get valid totalRowCount, will take this: {}", fullTableRowCount);
+ totalRowCount = fullTableRowCount;
+ }
+ if (avgRowSize == AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
+ avgRowSize = fullTableEstimatedSize/fullTableRowCount;
+ }
+ double totalBlocks = getNumOfBlocks(totalRowCount, fullTableEstimatedSize, avgRowSize, pluginCostModel);
+ double numBlocks = Math.min(totalBlocks, getNumOfBlocks(leadingRowCount, fullTableEstimatedSize, avgRowSize, pluginCostModel));
+ double diskCost = numBlocks * pluginCostModel.getSequentialBlockReadCost(this);
+ /*
+ * Table scan cost made INFINITE in order to pick index plans. Use the MAX possible rowCount for
+ * costing purposes.
+ * NOTE: Full table rowCounts are specified with the NULL condition.
+ * e.g. forcedRowCountMap<NULL, 1000>
+ */
+ if (forcedRowCountMap.get(null) != null && //Forced full table rowcount and it is HUGE
+ forcedRowCountMap.get(null) == ROWCOUNT_HUGE ) {
+ rowCount = ROWCOUNT_HUGE;
+ diskCost = ROWCOUNT_HUGE;
+ }
+
+ logger.debug("JsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}",
+ this.getOperatorId(), rowCount, avgRowSize, numBlocks, totalBlocks, diskCost);
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
+ }
+
+ private double getNumOfBlocks(double rowCount, double sizeFromDisk,
+ double avgRowSize, PluginCost pluginCostModel) {
+ if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0) {
+ return Math.ceil(sizeFromDisk / pluginCostModel.getBlockSize(this));
+ } else {
+ return Math.ceil(rowCount * avgRowSize / pluginCostModel.getBlockSize(this));
+ }
}
@Override
@@ -198,11 +366,16 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
return new JsonTableGroupScan(this);
}
+ @Override
@JsonIgnore
public String getTableName() {
return scanSpec.getTableName();
}
+ public IndexDesc getIndexDesc() {
+ return scanSpec.getIndexDesc();
+ }
+
public boolean isDisablePushdown() {
return !formatPluginConfig.isEnablePushdown();
}
@@ -214,11 +387,189 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
@Override
public String toString() {
- return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + "]";
+ return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns
+ + (maxRecordsToRead>0? ", limit=" + maxRecordsToRead : "")
+ + (getMaxParallelizationWidth()>0? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]";
}
public JsonScanSpec getScanSpec() {
return scanSpec;
}
+ @Override
+ public boolean supportsSecondaryIndex() {
+ return true;
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean isIndexScan() {
+ return scanSpec != null && scanSpec.isSecondaryIndex();
+ }
+
+ @Override
+ public boolean supportsRestrictedScan() {
+ return true;
+ }
+
+ /**
+ * Set the row count resulting from applying the {@link RexNode} condition. Forced row counts will take
+ * precedence over stats row counts
+ * @param condition
+ * @param count
+ * @param capRowCount
+ */
+ @Override
+ @JsonIgnore
+ public void setRowCount(RexNode condition, double count, double capRowCount) {
+ forcedRowCountMap.put(condition, count);
+ }
+
+ @Override
+ public void setStatistics(Statistics statistics) {
+ assert statistics instanceof MapRDBStatistics : String.format(
+ "Passed unexpected statistics instance. Expects MAPR-DB Statistics instance");
+ this.stats = ((MapRDBStatistics) statistics);
+ }
+
+ /**
+ * Get the row count after applying the {@link RexNode} condition
+ * @param condition, filter to apply
+ * @return row count post filtering
+ */
+ @Override
+ @JsonIgnore
+ public double getRowCount(RexNode condition, RelNode scanRel) {
+ // Do not use statistics if row count is forced. Forced rowcounts take precedence over stats
+ double rowcount;
+ if (forcedRowCountMap.get(condition) != null) {
+ return forcedRowCountMap.get(condition);
+ }
+ if (scanSpec.getIndexDesc() != null) {
+ String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(),
+ scanSpec.getIndexName());
+ rowcount = stats.getRowCount(condition, idxIdentifier, scanRel);
+ } else {
+ rowcount = stats.getRowCount(condition, null, scanRel);
+ }
+ // Stats might NOT have the full rows (e.g. table is newly populated and DB stats APIs return it after
+ // 15 mins). Use the table rows as populated using the (expensive but accurate) Hbase API if needed.
+ if (condition == null && (rowcount == 0 || rowcount == ROWCOUNT_UNKNOWN)) {
+ rowcount = fullTableRowCount;
+ logger.debug("getRowCount: Stats not available yet! Use Admin APIs full table rowcount {}",
+ fullTableRowCount);
+ }
+ return rowcount;
+ }
+
+ @Override
+ public boolean isDistributed() {
+ // getMaxParallelizationWidth gets information about all regions to scan and is expensive.
+ // This option is meant to be used only for unit tests.
+ boolean useNumRegions = storagePlugin.getContext().getConfig().getBoolean(PluginConstants.JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING);
+ double fullTableSize;
+
+ if (useNumRegions) {
+ return getMaxParallelizationWidth() > 1 ? true: false;
+ }
+
+ // This function gets called multiple times during planning. To avoid performance
+ // bottleneck, estimate degree of parallelization using stats instead of actually getting information
+ // about all regions.
+ double rowCount, rowSize;
+ double scanRangeSize = storagePlugin.getContext().getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB) * 1024 * 1024;
+
+ if (scanSpec.getIndexDesc() != null) {
+ String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(), scanSpec.getIndexName());
+ rowCount = stats.getRowCount(scanSpec.getCondition(), idxIdentifier);
+ rowSize = stats.getAvgRowSize(idxIdentifier, false);
+ } else {
+ rowCount = stats.getRowCount(scanSpec.getCondition(), null);
+ rowSize = stats.getAvgRowSize(null, false);
+ }
+
+ if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0 ||
+ rowSize == AVG_ROWSIZE_UNKNOWN || rowSize == 0) {
+ fullTableSize = (scanSpec.getSerializedFilter() != null ? .5 : 1) * this.fullTableEstimatedSize;
+ } else {
+ fullTableSize = rowCount * rowSize;
+ }
+
+ return (long) fullTableSize / scanRangeSize > 1 ? true : false;
+ }
+
+ @Override
+ public MapRDBStatistics getStatistics() {
+ return stats;
+ }
+
+ @Override
+ @JsonIgnore
+ public void setColumns(List<SchemaPath> columns) {
+ this.columns = columns;
+ }
+
+ @Override
+ @JsonIgnore
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @Override
+ @JsonIgnore
+ public PartitionFunction getRangePartitionFunction(List<FieldReference> refList) {
+
+ return null;
+ //new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin());
+ }
+
+ /**
+ * Convert a given {@link LogicalExpression} condition into a {@link QueryCondition} condition
+ * @param condition expressed as a {@link LogicalExpression}
+ * @return {@link QueryCondition} condition equivalent to the given expression
+ */
+ @JsonIgnore
+ public QueryCondition convertToQueryCondition(LogicalExpression condition) {
+ final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(this, condition);
+ final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
+ if (newScanSpec != null) {
+ return newScanSpec.getCondition();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Json table reader support limit
+ * @return
+ */
+ @Override
+ public boolean supportsLimitPushdown() {
+ if (maxRecordsToRead < 0) {
+ return true;
+ }
+ return false;//limit is already pushed. No more pushdown of limit
+ }
+
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ maxRecordsToRead = Math.max(maxRecords, 1);
+ return this;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ if (this.parallelizationWidth > 0) {
+ return this.parallelizationWidth;
+ }
+ return super.getMaxParallelizationWidth();
+ }
+
+ @Override
+ public void setParallelizationWidth(int width) {
+ if (width > 0) {
+ this.parallelizationWidth = width;
+ logger.debug("Forced parallelization width = {}", width);
+ }
+ }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 628a986..63a9381 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -17,18 +17,16 @@
*/
package org.apache.drill.exec.store.mapr.db.json;
-import static org.ojai.DocumentConstants.ID_KEY;
-import static org.ojai.DocumentConstants.ID_FIELD;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.Stack;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import com.mapr.db.Table;
+import com.mapr.db.Table.TableOption;
+import com.mapr.db.exceptions.DBException;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.index.IndexDesc;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+import com.mapr.db.util.ByteBufs;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
@@ -40,47 +38,61 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
-import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.util.EncodedSchemaPathSet;
import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.fs.Path;
import org.ojai.DocumentReader;
-import org.ojai.DocumentReader.EventType;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
import org.ojai.FieldSegment;
-import org.ojai.Value;
import org.ojai.store.QueryCondition;
+import org.ojai.util.FieldProjector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
+
import com.mapr.db.MapRDB;
-import com.mapr.db.Table;
-import com.mapr.db.Table.TableOption;
-import com.mapr.db.exceptions.DBException;
-import com.mapr.db.impl.IdCodec;
-import com.mapr.db.ojai.DBDocumentReaderBase;
-import com.mapr.db.util.ByteBufs;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
-import io.netty.buffer.DrillBuf;
+import static org.apache.drill.exec.store.mapr.PluginConstants.DOCUMENT_SCHEMA_PATH;
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.dataReadError;
+import static org.ojai.DocumentConstants.ID_FIELD;
public class MaprDBJsonRecordReader extends AbstractRecordReader {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
- private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
+ protected static final FieldPath[] ID_ONLY_PROJECTION = { ID_FIELD };
- private Table table;
- private QueryCondition condition;
- private FieldPath[] projectedFields;
+ protected Table table;
+ protected QueryCondition condition;
+
+ /**
+ * A set of projected FieldPaths that are pushed into MapR-DB Scanner.
+ * This set is a superset of the fields returned by {@link #getColumns()} when
+ * projection pass-through is in effect. In such cases, {@link #getColumns()}
+ * returns only those fields which are required by Drill to run its operators.
+ */
+ private FieldPath[] scannedFields;
- private final String tableName;
private OperatorContext operatorContext;
- private VectorContainerWriter vectorWriter;
+ protected VectorContainerWriter vectorWriter;
private DBDocumentReaderBase reader;
private DrillBuf buffer;
@@ -91,6 +103,10 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private boolean includeId;
private boolean idOnly;
+
+ private boolean projectWholeDocument;
+ private FieldProjector projector;
+
private final boolean unionEnabled;
private final boolean readNumbersAsDouble;
private boolean disablePushdown;
@@ -99,15 +115,27 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private final boolean disableCountOptimization;
private final boolean nonExistentColumnsProjection;
- public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
- MapRDBFormatPluginConfig formatPluginConfig,
- List<SchemaPath> projectedColumns, FragmentContext context) {
+ protected final MapRDBSubScanSpec subScanSpec;
+ protected final MapRDBFormatPlugin formatPlugin;
+
+ protected OjaiValueWriter valueWriter;
+ protected DocumentReaderVectorWriter documentWriter;
+ protected int maxRecordsToRead = -1;
+
+ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
+ List<SchemaPath> projectedColumns, FragmentContext context, int maxRecords) {
+ this(subScanSpec, formatPlugin, projectedColumns, context);
+ this.maxRecordsToRead = maxRecords;
+ }
+
+ protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
+ List<SchemaPath> projectedColumns, FragmentContext context) {
buffer = context.getManagedBuffer();
- projectedFields = null;
- tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName();
- documentReaderIterators = null;
- includeId = false;
- idOnly = false;
+ final Path tablePath = new Path(Preconditions.checkNotNull(subScanSpec,
+ "MapRDB reader needs a sub-scan spec").getTableName());
+ this.subScanSpec = subScanSpec;
+ this.formatPlugin = formatPlugin;
+ final IndexDesc indexDesc = subScanSpec.getIndexDesc();
byte[] serializedFilter = subScanSpec.getSerializedFilter();
condition = null;
@@ -115,75 +143,153 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter));
}
- disableCountOptimization = formatPluginConfig.disableCountOptimization();
+ disableCountOptimization = formatPlugin.getConfig().disableCountOptimization();
+ // Below call will set the scannedFields and includeId correctly
setColumns(projectedColumns);
- unionEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
- readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
- allTextMode = formatPluginConfig.isAllTextMode();
- ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange();
- disablePushdown = !formatPluginConfig.isEnablePushdown();
- nonExistentColumnsProjection = formatPluginConfig.isNonExistentFieldSupport();
+ unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+ readNumbersAsDouble = formatPlugin.getConfig().isReadAllNumbersAsDouble();
+ allTextMode = formatPlugin.getConfig().isAllTextMode();
+ ignoreSchemaChange = formatPlugin.getConfig().isIgnoreSchemaChange();
+ disablePushdown = !formatPlugin.getConfig().isEnablePushdown();
+ nonExistentColumnsProjection = formatPlugin.getConfig().isNonExistentFieldSupport();
+
+ // Do not use cached table handle for two reasons.
+ // cached table handles default timeout is 60 min after which those handles will become stale.
+ // Since execution can run for longer than 60 min, we want to get a new table handle and use it
+ // instead of the one from cache.
+ // Since we are setting some table options, we do not want to use shared handles.
+ //
+ // Call it here instead of setup since this will make sure it's called under correct UGI block when impersonation
+ // is enabled and table is used with and without views.
+ table = (indexDesc == null ? MapRDBImpl.getTable(tablePath) : MapRDBImpl.getIndexTable(indexDesc));
+
+ if (condition != null) {
+ logger.debug("Created record reader with query condition {}", condition.toString());
+ } else {
+ logger.debug("Created record reader with query condition NULL");
+ }
}
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+ Set<SchemaPath> encodedSchemaPathSet = Sets.newLinkedHashSet();
+
if (disablePushdown) {
transformed.add(SchemaPath.STAR_COLUMN);
includeId = true;
- return transformed;
- }
+ } else {
+ if (isStarQuery()) {
+ transformed.add(SchemaPath.STAR_COLUMN);
+ includeId = true;
+ if (isSkipQuery() && !disableCountOptimization) {
+ // `SELECT COUNT(*)` query
+ idOnly = true;
+ scannedFields = ID_ONLY_PROJECTION;
+ }
+ } else {
+ Set<FieldPath> scannedFieldsSet = Sets.newTreeSet();
+ Set<FieldPath> projectedFieldsSet = null;
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- includeId = true;
- if (isSkipQuery()) {
- // `SELECT COUNT(*)` query
- if (!disableCountOptimization) {
- projectedFields = new FieldPath[1];
- projectedFields[0] = ID_FIELD;
+ for (SchemaPath column : columns) {
+ if (EncodedSchemaPathSet.isEncodedSchemaPath(column)) {
+ encodedSchemaPathSet.add(column);
+ } else {
+ transformed.add(column);
+ if (!DOCUMENT_SCHEMA_PATH.equals(column)) {
+ FieldPath fp = getFieldPathForProjection(column);
+ scannedFieldsSet.add(fp);
+ } else {
+ projectWholeDocument = true;
+ }
+ }
+ }
+ if (projectWholeDocument) {
+ // we do not want to project the fields from the encoded field path list
+ // hence make a copy of the scannedFieldsSet here for projection.
+ projectedFieldsSet = new ImmutableSet.Builder<FieldPath>()
+ .addAll(scannedFieldsSet).build();
}
- }
- return transformed;
- }
- Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
- for (SchemaPath column : columns) {
- if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
- includeId = true;
- if (!disableCountOptimization) {
- projectedFieldsSet.add(ID_FIELD);
+ if (encodedSchemaPathSet.size() > 0) {
+ Collection<SchemaPath> decodedSchemaPaths = EncodedSchemaPathSet.decode(encodedSchemaPathSet);
+ // now we look at the fields which are part of encoded field set and either
+ // add them to scanned set or clear the scanned set if all fields were requested.
+ for (SchemaPath column : decodedSchemaPaths) {
+ if (column.equals(SchemaPath.STAR_COLUMN)) {
+ includeId = true;
+ scannedFieldsSet.clear();
+ break;
+ }
+ scannedFieldsSet.add(getFieldPathForProjection(column));
+ }
}
- } else {
- projectedFieldsSet.add(getFieldPathForProjection(column));
- }
- transformed.add(column);
- }
+ if (scannedFieldsSet.size() > 0) {
+ if (includesIdField(scannedFieldsSet)) {
+ includeId = true;
+ }
+ scannedFields = scannedFieldsSet.toArray(new FieldPath[scannedFieldsSet.size()]);
+ }
- if (projectedFieldsSet.size() > 0) {
- projectedFields = projectedFieldsSet.toArray(new FieldPath[projectedFieldsSet.size()]);
- }
+ if (disableCountOptimization) {
+ idOnly = (scannedFields == null);
+ }
- if (disableCountOptimization) {
- idOnly = (projectedFields == null);
- }
+ if(projectWholeDocument) {
+ projector = new FieldProjector(projectedFieldsSet);
+ }
+ }
+ }
return transformed;
}
+ protected FieldPath[] getScannedFields() {
+ return scannedFields;
+ }
+
+ protected boolean getIdOnly() {
+ return idOnly;
+ }
+
+ protected Table getTable() {
+ return table;
+ }
+
+ protected boolean getIgnoreSchemaChange() {
+ return ignoreSchemaChange;
+ }
+
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
this.operatorContext = context;
try {
- table = MapRDB.getTable(tableName);
table.setOption(TableOption.EXCLUDEID, !includeId);
- documentStream = table.find(condition, projectedFields);
+ documentStream = table.find(condition, scannedFields);
documentReaderIterators = documentStream.documentReaders().iterator();
- } catch (DBException e) {
- throw new ExecutionSetupException(e);
+
+ if (allTextMode) {
+ valueWriter = new AllTextValueWriter(buffer);
+ } else if (readNumbersAsDouble) {
+ valueWriter = new NumbersAsDoubleValueWriter(buffer);
+ } else {
+ valueWriter = new OjaiValueWriter(buffer);
+ }
+
+ if (projectWholeDocument) {
+ documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId);
+ } else if (isSkipQuery()) {
+ documentWriter = new RowCountVectorWriter(valueWriter);
+ } else if (idOnly) {
+ documentWriter = new IdOnlyVectorWriter(valueWriter);
+ } else {
+ documentWriter = new FieldTransferVectorWriter(valueWriter);
+ }
+ } catch (DBException ex) {
+ throw new ExecutionSetupException(ex);
}
}
@@ -198,24 +304,17 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
int recordCount = 0;
reader = null;
- while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
+ int maxRecordsForThisBatch = this.maxRecordsToRead >= 0?
+ Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) : BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+ while(recordCount < maxRecordsForThisBatch) {
vectorWriter.setPosition(recordCount);
try {
reader = nextDocumentReader();
if (reader == null) {
- break; // no more documents for this scanner
- } else if (isSkipQuery()) {
- vectorWriter.rootAsMap().bit("count").writeBit(1);
+ break; // no more documents for this reader
} else {
- MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap());
- if (idOnly) {
- writeId(writer, reader.getId());
- } else {
- if (reader.next() != EventType.START_MAP) {
- throw dataReadError("The document did not start with START_MAP!");
- }
- writeToListOrMap(writer, reader);
- }
+ documentWriter.writeDBDocument(vectorWriter, reader);
}
recordCount++;
} catch (UserException e) {
@@ -225,11 +324,12 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
reader == null ? null : IdCodec.asString(reader.getId())))
.build(logger);
} catch (SchemaChangeException e) {
+ String err_row = reader.getId().asJsonString();
if (ignoreSchemaChange) {
- logger.warn("{}. Dropping the row from result.", e.getMessage());
+ logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row);
logger.debug("Stack trace:", e);
} else {
- throw dataReadError(e);
+ throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row);
}
}
}
@@ -238,243 +338,14 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.EMPTY_LIST);
}
vectorWriter.setValueCount(recordCount);
+ if (maxRecordsToRead > 0) {
+ maxRecordsToRead -= recordCount;
+ }
logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount);
return recordCount;
}
- private void writeId(MapOrListWriterImpl writer, Value id) throws SchemaChangeException {
- try {
- switch(id.getType()) {
- case STRING:
- writeString(writer, ID_KEY, id.getString());
- break;
- case BINARY:
- writeBinary(writer, ID_KEY, id.getBinary());
- break;
- default:
- throw new UnsupportedOperationException(id.getType() +
- " is not a supported type for _id field.");
- }
- } catch (IllegalStateException | IllegalArgumentException e) {
- throw schemaChangeException(e, "Possible schema change at _id: '%s'", IdCodec.asString(id));
- }
- }
-
- private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) throws SchemaChangeException {
- String fieldName = null;
- writer.start();
- outside: while (true) {
- EventType event = reader.next();
- if (event == null
- || event == EventType.END_MAP
- || event == EventType.END_ARRAY) {
- break outside;
- } else if (reader.inMap()) {
- fieldName = reader.getFieldName();
- }
-
- try {
- switch (event) {
- case NULL:
- break; // not setting the field will leave it as null
- case BINARY:
- writeBinary(writer, fieldName, reader.getBinary());
- break;
- case BOOLEAN:
- writeBoolean(writer, fieldName, reader);
- break;
- case STRING:
- writeString(writer, fieldName, reader.getString());
- break;
- case BYTE:
- writeByte(writer, fieldName, reader);
- break;
- case SHORT:
- writeShort(writer, fieldName, reader);
- break;
- case INT:
- writeInt(writer, fieldName, reader);
- break;
- case LONG:
- writeLong(writer, fieldName, reader);
- break;
- case FLOAT:
- writeFloat(writer, fieldName, reader);
- break;
- case DOUBLE:
- writeDouble(writer, fieldName, reader);
- break;
- case DECIMAL:
- writeDecimal(writer, fieldName, reader);
- case DATE:
- writeDate(writer, fieldName, reader);
- break;
- case TIME:
- writeTime(writer, fieldName, reader);
- break;
- case TIMESTAMP:
- writeTimeStamp(writer, fieldName, reader);
- break;
- case INTERVAL:
- throw unsupportedError("Interval type is currently not supported.");
- case START_MAP:
- writeToListOrMap((MapOrListWriterImpl) (reader.inMap() ? writer.map(fieldName) : writer.listoftmap(fieldName)), reader);
- break;
- case START_ARRAY:
- writeToListOrMap((MapOrListWriterImpl) writer.list(fieldName), reader);
- break;
- default:
- throw unsupportedError("Unsupported type: %s encountered during the query.", event);
- }
- } catch (IllegalStateException | IllegalArgumentException e) {
- throw schemaChangeException(e, "Possible schema change at _id: '%s', field: '%s'", IdCodec.asString(reader.getId()), fieldName);
- }
- }
- writer.end();
- }
-
- private void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, reader.getTimestamp().toUTCString());
- } else {
- ((writer.map != null) ? writer.map.timeStamp(fieldName) : writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong());
- }
- }
-
- private void writeTime(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, reader.getTime().toTimeStr());
- } else {
- ((writer.map != null) ? writer.map.time(fieldName) : writer.list.time()).writeTime(reader.getTimeInt());
- }
- }
-
- private void writeDate(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, reader.getDate().toDateStr());
- } else {
- long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY;
- ((writer.map != null) ? writer.map.date(fieldName) : writer.list.date()).writeDate(milliSecondsSinceEpoch);
- }
- }
-
- private void writeDouble(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getDouble()));
- } else {
- writer.float8(fieldName).writeFloat8(reader.getDouble());
- }
- }
-
- private void writeDecimal(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getDecimal()));
- } else {
- writer.varDecimal(fieldName, reader.getDecimalScale(), reader.getDecimalPrecision())
- .writeVarDecimal(reader.getDecimal());
- }
- }
-
- private void writeFloat(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getFloat()));
- } else if (readNumbersAsDouble) {
- writer.float8(fieldName).writeFloat8(reader.getFloat());
- } else {
- writer.float4(fieldName).writeFloat4(reader.getFloat());
- }
- }
-
- private void writeLong(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getLong()));
- } else if (readNumbersAsDouble) {
- writer.float8(fieldName).writeFloat8(reader.getLong());
- } else {
- writer.bigInt(fieldName).writeBigInt(reader.getLong());
- }
- }
-
- private void writeInt(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getInt()));
- } else if (readNumbersAsDouble) {
- writer.float8(fieldName).writeFloat8(reader.getInt());
- } else {
- writer.integer(fieldName).writeInt(reader.getInt());
- }
- }
-
- private void writeShort(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getShort()));
- } else if (readNumbersAsDouble) {
- writer.float8(fieldName).writeFloat8(reader.getShort());
- } else {
- ((writer.map != null) ? writer.map.smallInt(fieldName) : writer.list.smallInt()).writeSmallInt(reader.getShort());
- }
- }
-
- private void writeByte(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getByte()));
- } else if (readNumbersAsDouble) {
- writer.float8(fieldName).writeFloat8(reader.getByte());
- } else {
- ((writer.map != null) ? writer.map.tinyInt(fieldName) : writer.list.tinyInt()).writeTinyInt(reader.getByte());
- }
- }
-
- private void writeBoolean(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
- if (allTextMode) {
- writeString(writer, fieldName, String.valueOf(reader.getBoolean()));
- } else {
- writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
- }
- }
-
- private void writeBinary(MapOrListWriterImpl writer, String fieldName, ByteBuffer buf) {
- if (allTextMode) {
- writeString(writer, fieldName, Bytes.toString(buf));
- } else {
- buffer = buffer.reallocIfNeeded(buf.remaining());
- buffer.setBytes(0, buf, buf.position(), buf.remaining());
- writer.binary(fieldName).writeVarBinary(0, buf.remaining(), buffer);
- }
- }
-
- private void writeString(MapOrListWriterImpl writer, String fieldName, String value) {
- final byte[] strBytes = Bytes.toBytes(value);
- buffer = buffer.reallocIfNeeded(strBytes.length);
- buffer.setBytes(0, strBytes);
- writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer);
- }
-
- private UserException unsupportedError(String format, Object... args) {
- return UserException.unsupportedError()
- .message(String.format(format, args))
- .build(logger);
- }
-
- private UserException dataReadError(Throwable t) {
- return dataReadError(t, null);
- }
-
- private UserException dataReadError(String format, Object... args) {
- return dataReadError(null, format, args);
- }
-
- private UserException dataReadError(Throwable t, String format, Object... args) {
- return UserException.dataReadError(t)
- .message(format == null ? null : String.format(format, args))
- .build(logger);
- }
-
- private SchemaChangeException schemaChangeException(Throwable t, String format, Object... args) {
- return new SchemaChangeException(format, t, args);
- }
-
- private DBDocumentReaderBase nextDocumentReader() {
+ protected DBDocumentReaderBase nextDocumentReader() {
final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
try {
if (operatorStats != null) {
@@ -492,7 +363,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
}
} catch (DBException e) {
- throw dataReadError(e);
+ throw dataReadError(logger, e);
}
}
@@ -505,7 +376,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
* first encountered ARRAY field and let Drill handle the projection.
*/
private static FieldPath getFieldPathForProjection(SchemaPath column) {
- Stack<PathSegment.NameSegment> pathSegments = new Stack<PathSegment.NameSegment>();
+ Stack<PathSegment.NameSegment> pathSegments = new Stack<>();
PathSegment seg = column.getRootSegment();
while (seg != null && seg.isNamed()) {
pathSegments.push((PathSegment.NameSegment) seg);
@@ -518,6 +389,15 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
return new FieldPath(child);
}
+ public static boolean includesIdField(Collection<FieldPath> projected) {
+ return Iterables.tryFind(projected, new Predicate<FieldPath>() {
+ @Override
+ public boolean apply(FieldPath path) {
+ return Preconditions.checkNotNull(path).equals(ID_FIELD);
+ }
+ }).isPresent();
+ }
+
@Override
public void close() {
if (documentStream != null) {
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/NumbersAsDoubleValueWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/NumbersAsDoubleValueWriter.java
new file mode 100644
index 0000000..d7d38cb
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/NumbersAsDoubleValueWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.ojai.DocumentReader;
+
+import io.netty.buffer.DrillBuf;
+
+public class NumbersAsDoubleValueWriter extends OjaiValueWriter {
+
+ public NumbersAsDoubleValueWriter(DrillBuf buffer) {
+ super(buffer);
+ }
+
+ protected void writeFloat(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getFloat());
+ }
+
+ protected void writeLong(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getLong());
+ }
+
+ protected void writeInt(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getInt());
+ }
+
+ protected void writeShort(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getShort());
+ }
+
+ protected void writeByte(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getByte());
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiFunctionsProcessor.java
new file mode 100644
index 0000000..959e243
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiFunctionsProcessor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.commons.codec.binary.Base64;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import org.ojai.Value;
+import org.ojai.store.QueryCondition;
+
+import com.google.common.collect.ImmutableMap;
+import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.MapRDBImpl;
+
+import java.nio.ByteBuffer;
+
+class OjaiFunctionsProcessor extends AbstractExprVisitor<Void, Void, RuntimeException> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OjaiFunctionsProcessor.class);
+ private QueryCondition queryCond;
+
+ private OjaiFunctionsProcessor() {
+ }
+
+ private static String getStackTrace() {
+ final Throwable throwable = new Throwable();
+ final StackTraceElement[] ste = throwable.getStackTrace();
+ final StringBuilder sb = new StringBuilder();
+ for(int i = 1; i < ste.length; ++i) {
+ sb.append(ste[i].toString());
+ sb.append('\n');
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ public Void visitUnknown(LogicalExpression e, Void valueArg) throws RuntimeException {
+ logger.debug("visitUnknown() e class " + e.getClass());
+ logger.debug(getStackTrace());
+ return null;
+ }
+
+ private static class Ref<T> {
+ T value;
+ }
+
+ private static SchemaPath getSchemaPathArg(LogicalExpression expr) {
+ final Ref<SchemaPath> ref = new Ref<>();
+ expr.accept(new OjaiFunctionsProcessor() {
+ @Override
+ public Void visitSchemaPath(SchemaPath e, Void v) {
+ ref.value = e;
+ return null;
+ }
+ }, null);
+
+ return ref.value;
+ }
+
+ private static String getStringArg(LogicalExpression expr) {
+ final Ref<QuotedString> ref = new Ref<>();
+ expr.accept(new OjaiFunctionsProcessor() {
+ @Override
+ public Void visitQuotedStringConstant(QuotedString e, Void v) {
+ ref.value = e;
+ return null;
+ }
+ }, null);
+
+ return ref.value != null ? ref.value.getString() : null;
+ }
+
+ private static int getIntArg(LogicalExpression expr) {
+ final Ref<Integer> ref = new Ref<>();
+ expr.accept(new OjaiFunctionsProcessor() {
+ @Override
+ public Void visitIntConstant(IntExpression e, Void v) {
+ ref.value = new Integer(e.getInt());
+ return null;
+ }
+ }, null);
+
+ return ref.value != null ? ref.value.intValue() : 0;
+ }
+
+ private static long getLongArg(LogicalExpression expr) {
+ final Ref<Long> ref = new Ref<>();
+ expr.accept(new OjaiFunctionsProcessor() {
+ @Override
+ public Void visitIntConstant(IntExpression e, Void v) {
+ ref.value = new Long(e.getInt());
+ return null;
+ }
+
+ @Override
+ public Void visitLongConstant(LongExpression e, Void v) {
+ ref.value = e.getLong();
+ return null;
+ }
+ }, null);
+
+ return ref.value != null ? ref.value.longValue() : 0;
+ }
+
+ private final static ImmutableMap<String, QueryCondition.Op> STRING_TO_RELOP;
+ static {
+ ImmutableMap.Builder<String, QueryCondition.Op> builder = ImmutableMap.builder();
+ STRING_TO_RELOP = builder
+ .put("=", QueryCondition.Op.EQUAL)
+ .put("<>", QueryCondition.Op.NOT_EQUAL)
+ .put("<", QueryCondition.Op.LESS)
+ .put("<=", QueryCondition.Op.LESS_OR_EQUAL)
+ .put(">", QueryCondition.Op.GREATER)
+ .put(">=", QueryCondition.Op.GREATER_OR_EQUAL)
+ .build();
+ }
+
+ @Override
+ public Void visitFunctionCall(FunctionCall call, Void v) throws RuntimeException {
+ final String functionName = call.getName();
+ final String fieldName = FieldPathHelper.schemaPath2FieldPath(getSchemaPathArg(call.args.get(0))).asPathString();
+ switch(functionName) {
+ case "ojai_sizeof": {
+ // ojai_sizeof(field, "<rel-op>", <int-value>)
+ final String relOp = getStringArg(call.args.get(1));
+ final long size = getLongArg(call.args.get(2));
+ queryCond = MapRDBImpl.newCondition()
+ .sizeOf(fieldName, STRING_TO_RELOP.get(relOp), size)
+ .build();
+ break;
+ }
+
+ case "ojai_typeof":
+ case "ojai_nottypeof": {
+ // ojai_[not]typeof(field, <type-code>);
+ final int typeCode = getIntArg(call.args.get(1));
+ final Value.Type typeValue = Value.Type.valueOf(typeCode);
+ queryCond = MapRDBImpl.newCondition();
+ if (functionName.equals("ojai_typeof")) {
+ queryCond.typeOf(fieldName, typeValue);
+ } else {
+ queryCond.notTypeOf(fieldName, typeValue);
+ }
+ queryCond.build();
+ break;
+ }
+
+ case "ojai_matches":
+ case "ojai_notmatches": {
+ // ojai_[not]matches(field, <regex>);
+ final SchemaPath schemaPath = getSchemaPathArg(call.args.get(0));
+ final String regex = getStringArg(call.args.get(1));
+ if (functionName.equals("ojai_matches")) {
+ queryCond = MapRDBImpl.newCondition()
+ .matches(fieldName, regex);
+ } else {
+ queryCond = MapRDBImpl.newCondition()
+ .notMatches(fieldName, regex);
+ }
+ queryCond.build();
+ break;
+ }
+
+ case "ojai_condition": {
+ // ojai_condition(field, <serialized-condition>);
+ final SchemaPath schemaPath = getSchemaPathArg(call.args.get(0));
+ final String condString = getStringArg(call.args.get(1));
+ final byte[] condBytes = Base64.decodeBase64(condString);
+ final ByteBuffer condBuffer = ByteBuffer.wrap(condBytes);
+ queryCond = ConditionImpl.parseFrom(condBuffer);
+ break;
+ }
+
+ default:
+ throw new IllegalArgumentException("unrecognized functionName " + functionName);
+ } // switch(functionName)
+
+ return null;
+ }
+
+ public static OjaiFunctionsProcessor process(FunctionCall call) {
+ final OjaiFunctionsProcessor processor = new OjaiFunctionsProcessor();
+
+ call.accept(processor, null);
+ return processor;
+ }
+
+ public QueryCondition getCondition() {
+ return queryCond;
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiValueWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiValueWriter.java
new file mode 100644
index 0000000..99b2157
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/OjaiValueWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.schemaChangeException;
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.unsupportedError;
+
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapOrListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.ojai.DocumentReader;
+import org.ojai.DocumentReader.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+import io.netty.buffer.DrillBuf;
+
+public class OjaiValueWriter {
+ protected static final Logger logger = LoggerFactory.getLogger(OjaiValueWriter.class);
+
+ protected static final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
+
+ protected DrillBuf buffer;
+
+ public OjaiValueWriter(DrillBuf buffer) {
+ this.buffer = buffer;
+ }
+
+ /*
+ * Precondition to call this function is that the DBDocumentReader has already emitted START_MAP/START_ARRAY event.
+ */
+ protected void writeToListOrMap(MapOrListWriterImpl writer, DocumentReader reader) throws SchemaChangeException {
+ String fieldName = null;
+ writer.start();
+ outside: while (true) {
+ EventType event = reader.next();
+ if (event == null
+ || event == EventType.END_MAP
+ || event == EventType.END_ARRAY) {
+ break outside;
+ } else if (reader.inMap()) {
+ fieldName = reader.getFieldName();
+ }
+
+ try {
+ switch (event) {
+ case NULL:
+ break; // not setting the field will leave it as null
+ case BINARY:
+ writeBinary(writer, fieldName, reader.getBinary());
+ break;
+ case BOOLEAN:
+ writeBoolean(writer, fieldName, reader);
+ break;
+ case STRING:
+ writeString(writer, fieldName, reader.getString());
+ break;
+ case BYTE:
+ writeByte(writer, fieldName, reader);
+ break;
+ case SHORT:
+ writeShort(writer, fieldName, reader);
+ break;
+ case INT:
+ writeInt(writer, fieldName, reader);
+ break;
+ case LONG:
+ writeLong(writer, fieldName, reader);
+ break;
+ case FLOAT:
+ writeFloat(writer, fieldName, reader);
+ break;
+ case DOUBLE:
+ writeDouble(writer, fieldName, reader);
+ break;
+ case DECIMAL:
+ throw unsupportedError(logger, "Decimal type is currently not supported.");
+ case DATE:
+ writeDate(writer, fieldName, reader);
+ break;
+ case TIME:
+ writeTime(writer, fieldName, reader);
+ break;
+ case TIMESTAMP:
+ writeTimeStamp(writer, fieldName, reader);
+ break;
+ case INTERVAL:
+ throw unsupportedError(logger, "Interval type is currently not supported.");
+ case START_MAP:
+ writeToListOrMap((MapOrListWriterImpl) (reader.inMap() ? writer.map(fieldName) : writer.listoftmap(fieldName)), reader);
+ break;
+ case START_ARRAY:
+ writeToListOrMap((MapOrListWriterImpl) writer.list(fieldName), reader);
+ break;
+ default:
+ throw unsupportedError(logger, "Unsupported type: %s encountered during the query.", event);
+ }
+ } catch (IllegalStateException | IllegalArgumentException e) {
+ throw schemaChangeException(logger, e, "Possible schema change for field: '%s'", fieldName);
+ }
+ }
+ writer.end();
+ }
+
+ protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+ }
+
+ protected void writeTime(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.time(fieldName).writeTime(reader.getTimeInt());
+ }
+
+ protected void writeDate(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY;
+ writer.date(fieldName).writeDate(milliSecondsSinceEpoch);
+ }
+
+ protected void writeDouble(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float8(fieldName).writeFloat8(reader.getDouble());
+ }
+
+ protected void writeFloat(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.float4(fieldName).writeFloat4(reader.getFloat());
+ }
+
+ protected void writeLong(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.bigInt(fieldName).writeBigInt(reader.getLong());
+ }
+
+ protected void writeInt(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.integer(fieldName).writeInt(reader.getInt());
+ }
+
+ protected void writeShort(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.smallInt(fieldName).writeSmallInt(reader.getShort());
+ }
+
+ protected void writeByte(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.tinyInt(fieldName).writeTinyInt(reader.getByte());
+ }
+
+ protected void writeBoolean(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+ }
+
+ protected void writeBinary(MapOrListWriter writer, String fieldName, ByteBuffer buf) {
+ int bufLen = buf.remaining();
+ buffer = buffer.reallocIfNeeded(bufLen);
+ buffer.setBytes(0, buf, buf.position(), bufLen);
+ writer.varBinary(fieldName).writeVarBinary(0, bufLen, buffer);
+ }
+
+ protected void writeString(MapOrListWriter writer, String fieldName, String value) {
+ final byte[] strBytes = Bytes.toBytes(value);
+ buffer = buffer.reallocIfNeeded(strBytes.length);
+ buffer.setBytes(0, strBytes);
+ writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer);
+ }
+
+ protected void writeBinary(MapWriter writer, String fieldName, ByteBuffer buf) {
+ int bufLen = buf.remaining();
+ buffer = buffer.reallocIfNeeded(bufLen);
+ buffer.setBytes(0, buf, buf.position(), bufLen);
+ writer.varBinary(fieldName).writeVarBinary(0, bufLen, buffer);
+ }
+
+ protected void writeString(MapWriter writer, String fieldName, String value) {
+ final byte[] strBytes = Bytes.toBytes(value);
+ buffer = buffer.reallocIfNeeded(strBytes.length);
+ buffer.setBytes(0, strBytes);
+ writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/ProjectionPassthroughVectorWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/ProjectionPassthroughVectorWriter.java
new file mode 100644
index 0000000..bc304fd
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/ProjectionPassthroughVectorWriter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.dataReadError;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapOrListWriter;
+import org.ojai.DocumentConstants;
+import org.ojai.DocumentReader.EventType;
+import org.ojai.util.DocumentReaderWithProjection;
+import org.ojai.util.FieldProjector;
+
+import com.google.common.base.Preconditions;
+import com.mapr.db.DBConstants;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+/**
+ * This implementation of DocumentReaderVectorWriter writes the encoded MapR-DB OJAI Document
+ * as binary data along with the fields required to execute Drill's operators.
+ */
+class ProjectionPassthroughVectorWriter extends DocumentReaderVectorWriter {
+
+ private final boolean includeId;
+ private final FieldProjector projector;
+
+ protected ProjectionPassthroughVectorWriter(final OjaiValueWriter valueWriter,
+ final FieldProjector projector, final boolean includeId) {
+ super(valueWriter);
+ this.includeId = includeId;
+ this.projector = Preconditions.checkNotNull(projector);
+ }
+
+ @Override
+ protected void writeDBDocument(VectorContainerWriter vectorWriter, DBDocumentReaderBase reader)
+ throws SchemaChangeException {
+ if (reader.next() != EventType.START_MAP) {
+ throw dataReadError(logger, "The document did not start with START_MAP!");
+ }
+
+ MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap());
+ writer.start();
+ MapOrListWriter documentMapWriter = writer.map(DBConstants.DOCUMENT_FIELD);
+ documentMapWriter.start();
+
+ // write _id field data
+ if (includeId) {
+ valueWriter.writeBinary(documentMapWriter, DocumentConstants.ID_KEY, reader.getIdData());
+ }
+
+ // write rest of the data buffers
+ Map<Integer, ByteBuffer> dataMap = reader.getDataMap();
+ for (Entry<Integer, ByteBuffer> familyData : dataMap.entrySet()) {
+ valueWriter.writeBinary(documentMapWriter, String.valueOf(familyData.getKey()), familyData.getValue());
+ }
+ documentMapWriter.end();
+
+ DocumentReaderWithProjection p = new DocumentReaderWithProjection(reader, projector);
+ valueWriter.writeToListOrMap(writer, p);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
new file mode 100644
index 0000000..01ff2d0
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.PluginErrorHandler.dataReadError;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.mapr.db.impl.BaseJsonTable;
+import com.mapr.db.impl.MultiGet;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
+
+import com.google.common.base.Stopwatch;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+import org.ojai.Document;
+import org.ojai.DocumentStream;
+import org.ojai.FieldPath;
+
+
+public class RestrictedJsonRecordReader extends MaprDBJsonRecordReader {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RestrictedJsonRecordReader.class);
+
+ private int batchSize; // batchSize for rowKey based document get
+
+ private String [] projections = null; // multiGet projections
+ public RestrictedJsonRecordReader(MapRDBSubScanSpec subScanSpec,
+ MapRDBFormatPlugin formatPlugin,
+ List<SchemaPath> projectedColumns, FragmentContext context) {
+
+ super(subScanSpec, formatPlugin, projectedColumns, context);
+ batchSize = (int)context.getOptions().getOption(ExecConstants.QUERY_ROWKEYJOIN_BATCHSIZE);
+ int idx = 0;
+ FieldPath[] scannedFields = this.getScannedFields();
+
+ // only populate projections for non-star query (for star, null is interpreted as all fields)
+ if (!this.isStarQuery() && scannedFields != null && scannedFields.length > 0) {
+ projections = new String[scannedFields.length];
+ for (FieldPath path : scannedFields) {
+ projections[idx] = path.asPathString();
+ ++idx;
+ }
+ }
+ }
+
+ public RestrictedJsonRecordReader(MapRDBSubScanSpec subScanSpec,
+ MapRDBFormatPlugin formatPlugin,
+ List<SchemaPath> projectedColumns,
+ FragmentContext context,
+ int maxRecordsToRead) {
+
+ super(subScanSpec, formatPlugin, projectedColumns, context, maxRecordsToRead);
+ batchSize = (int)context.getOptions().getOption(ExecConstants.QUERY_ROWKEYJOIN_BATCHSIZE);
+ int idx = 0;
+ FieldPath[] scannedFields = this.getScannedFields();
+
+ // only populate projections for non-star query (for star, null is interpreted as all fields)
+ if (!this.isStarQuery() && scannedFields != null && scannedFields.length > 0) {
+ projections = new String[scannedFields.length];
+ for (FieldPath path : scannedFields) {
+ projections[idx] = path.asPathString();
+ ++idx;
+ }
+ }
+ }
+
+ public void readToInitSchema() {
+ DBDocumentReaderBase reader = null;
+ vectorWriter.setPosition(0);
+
+ try (DocumentStream dstream = table.find()) {
+ reader = (DBDocumentReaderBase) dstream.iterator().next().asReader();
+ documentWriter.writeDBDocument(vectorWriter, reader);
+ } catch(UserException e) {
+ throw UserException.unsupportedError(e)
+ .addContext(String.format("Table: %s, document id: '%s'",
+ getTable().getPath(),
+ reader == null ? null : IdCodec.asString(reader.getId())))
+ .build(logger);
+ } catch (SchemaChangeException e) {
+ if (getIgnoreSchemaChange()) {
+ logger.warn("{}. Dropping the row from result.", e.getMessage());
+ logger.debug("Stack trace:", e);
+ } else {
+ throw dataReadError(logger, e);
+ }
+ }
+ finally {
+ vectorWriter.setPosition(0);
+ }
+ }
+
+ @Override
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ RestrictedMapRDBSubScanSpec rss = ((RestrictedMapRDBSubScanSpec) this.subScanSpec);
+ RowKeyJoin rjBatch = rss.getJoinForSubScan();
+ if (rjBatch == null) {
+ throw new ExecutionSetupException("RowKeyJoin Batch is not setup for Restricted MapRDB Subscan");
+ }
+
+ AbstractRecordBatch.BatchState state = rjBatch.getBatchState();
+ if ( state == AbstractRecordBatch.BatchState.BUILD_SCHEMA ||
+ state == AbstractRecordBatch.BatchState.FIRST) {
+ super.setup(context, output);
+ }
+ return;
+ }
+
+ @Override
+ public int next() {
+ Stopwatch watch = Stopwatch.createUnstarted();
+ watch.start();
+ RestrictedMapRDBSubScanSpec rss = ((RestrictedMapRDBSubScanSpec) this.subScanSpec);
+
+ vectorWriter.allocate();
+ vectorWriter.reset();
+
+ if (!rss.readyToGetRowKey()) {
+ // not ready to get rowkey, so we just load a record to initialize schema; only do this
+ // when we are in the build schema phase
+ if (rss.isBuildSchemaPhase()) {
+ readToInitSchema();
+ }
+ return 0;
+ }
+
+ final MultiGet multiGet = new MultiGet((BaseJsonTable) table, condition, false, projections);
+ int recordCount = 0;
+ DBDocumentReaderBase reader = null;
+
+ int maxRecordsForThisBatch = this.maxRecordsToRead > 0?
+ Math.min(rss.getMaxRowKeysToBeRead(), this.maxRecordsToRead) :
+ this.maxRecordsToRead == -1 ? rss.getMaxRowKeysToBeRead() : 0;
+
+ Stopwatch timer = Stopwatch.createUnstarted();
+
+ while (recordCount < maxRecordsForThisBatch) {
+ ByteBuffer rowKeyIds[] = rss.getRowKeyIdsToRead(batchSize);
+ if (rowKeyIds == null) {
+ break;
+ }
+ try {
+ timer.start();
+ final List<Document> docList = multiGet.doGet(rowKeyIds);
+ int index = 0;
+ long docsToRead = docList.size();
+ // If limit pushdown then stop once we have `limit` rows from multiget i.e. maxRecordsForThisBatch
+ if (this.maxRecordsToRead != -1) {
+ docsToRead = Math.min(docsToRead, maxRecordsForThisBatch);
+ }
+ while (index < docsToRead) {
+ vectorWriter.setPosition(recordCount);
+ reader = (DBDocumentReaderBase) docList.get(index).asReader();
+ documentWriter.writeDBDocument(vectorWriter, reader);
+ recordCount++;
+ index++;
+ }
+ timer.stop();
+ } catch (UserException e) {
+ throw UserException.unsupportedError(e).addContext(String.format("Table: %s, document id: '%s'",
+ getTable().getPath(), reader == null ? null : IdCodec.asString(reader.getId()))).build(logger);
+ } catch (SchemaChangeException e) {
+ if (getIgnoreSchemaChange()) {
+ logger.warn("{}. Dropping the row from result.", e.getMessage());
+ logger.debug("Stack trace:", e);
+ } else {
+ throw dataReadError(logger, e);
+ }
+ }
+ }
+
+ vectorWriter.setValueCount(recordCount);
+ if (maxRecordsToRead > 0) {
+ if (maxRecordsToRead - recordCount >= 0) {
+ maxRecordsToRead -= recordCount;
+ } else {
+ maxRecordsToRead = 0;
+ }
+ }
+
+ logger.debug("Took {} ms to get {} records, getrowkey {}", watch.elapsed(TimeUnit.MILLISECONDS), recordCount, timer.elapsed(TimeUnit.MILLISECONDS));
+ return recordCount;
+ }
+
+ @Override
+ public boolean hasNext() {
+ RestrictedMapRDBSubScanSpec rss = ((RestrictedMapRDBSubScanSpec) this.subScanSpec);
+
+ RowKeyJoin rjBatch = rss.getJoinForSubScan();
+ if (rjBatch == null) {
+ return false;
+ }
+
+ boolean hasMore = false;
+ AbstractRecordBatch.BatchState state = rss.getJoinForSubScan().getBatchState();
+ RowKeyJoin.RowKeyJoinState rkState = rss.getJoinForSubScan().getRowKeyJoinState();
+ if ( state == AbstractRecordBatch.BatchState.BUILD_SCHEMA ) {
+ hasMore = true;
+ } else if ( state == AbstractRecordBatch.BatchState.FIRST) {
+ if (this.maxRecordsToRead > 0) {
+ rss.getJoinForSubScan().setBatchState(AbstractRecordBatch.BatchState.NOT_FIRST);
+ rss.getJoinForSubScan().setRowKeyJoinState(RowKeyJoin.RowKeyJoinState.PROCESSING);
+ hasMore = true;
+ }
+ } else if ( rkState == RowKeyJoin.RowKeyJoinState.INITIAL) {
+ if (this.maxRecordsToRead > 0) {
+ rss.getJoinForSubScan().setRowKeyJoinState(RowKeyJoin.RowKeyJoinState.PROCESSING);
+ hasMore = true;
+ }
+ }
+
+ logger.debug("restricted reader hasMore = {}", hasMore);
+
+ return hasMore;
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RowCountVectorWriter.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RowCountVectorWriter.java
new file mode 100644
index 0000000..445bccb
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RowCountVectorWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+import com.mapr.db.ojai.DBDocumentReaderBase;
+
+/**
+ * This is an optimized implementation of DocumentReaderVectorWriter that writes the row count.
+ */
+class RowCountVectorWriter extends DocumentReaderVectorWriter {
+
+ protected RowCountVectorWriter(final OjaiValueWriter valueWriter) {
+ super(valueWriter);
+ }
+
+ @Override
+ protected void writeDBDocument(VectorContainerWriter vectorWriter, DBDocumentReaderBase reader)
+ throws SchemaChangeException {
+ vectorWriter.rootAsMap().bit("count").writeBit(1);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/ConditionPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/ConditionPlaceholder.java
new file mode 100644
index 0000000..3bc36b3
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/ConditionPlaceholder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+/**
+ * This is a placeholder for the ojai_condition() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_condition",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class ConditionPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) VarCharHolder pattern;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/DecodeFieldPath.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/DecodeFieldPath.java
new file mode 100644
index 0000000..6748c4f
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/DecodeFieldPath.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import javax.inject.Inject;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import io.netty.buffer.DrillBuf;
+
+@FunctionTemplate(name = "maprdb_decode_fieldpath", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+public class DecodeFieldPath implements DrillSimpleFunc {
+ @Param VarCharHolder input;
+ @Output VarCharHolder out;
+
+ @Inject DrillBuf buffer;
+
+ @Override
+ public void setup() {
+ }
+
+ @Override
+ public void eval() {
+ String[] encodedPaths = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.
+ toStringFromUTF8(input.start, input.end, input.buffer).split(",");
+ String[] decodedPaths = org.apache.drill.exec.util.EncodedSchemaPathSet.decode(encodedPaths);
+ java.util.Arrays.sort(decodedPaths);
+
+ StringBuilder sb = new StringBuilder();
+ for(String decodedPath : decodedPaths) {
+ sb.append(", ").append(org.ojai.FieldPath.parseFrom(decodedPath).asPathString());
+ }
+ String outputString = "[" + sb.substring(2) + "]";
+ final byte[] strBytes = outputString.getBytes(com.google.common.base.Charsets.UTF_8);
+ buffer.setBytes(0, strBytes);
+ buffer.setIndex(0, strBytes.length);
+
+ out.start = 0;
+ out.end = strBytes.length;
+ out.buffer = buffer;
+ }
+
+}
+
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/MatchesPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/MatchesPlaceholder.java
new file mode 100644
index 0000000..6aad44e
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/MatchesPlaceholder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+/**
+ * This is a placeholder for the matches() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_matches",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class MatchesPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) VarCharHolder pattern;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotMatchesPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotMatchesPlaceholder.java
new file mode 100644
index 0000000..56baebb
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotMatchesPlaceholder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+/**
+ * This is a placeholder for the notMatches() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_notmatches",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class NotMatchesPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) VarCharHolder pattern;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotTypeOfPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotTypeOfPlaceholder.java
new file mode 100644
index 0000000..6c01a48
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/NotTypeOfPlaceholder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+
+/**
+ * This is a placeholder for the nottypeof() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_nottypeof",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class NotTypeOfPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) IntHolder typeCode;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/SizeOfPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/SizeOfPlaceholder.java
new file mode 100644
index 0000000..7d7150c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/SizeOfPlaceholder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+/**
+ * This is a placeholder for the sizeof() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_sizeof",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class SizeOfPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) VarCharHolder relOp;
+ @Param(constant = true) BigIntHolder size;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/TypeOfPlaceholder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/TypeOfPlaceholder.java
new file mode 100644
index 0000000..1d1efc0
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/udf/mapr/db/TypeOfPlaceholder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.udf.mapr.db;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+
+/**
+ * This is a placeholder for the typeof() function.
+ *
+ * At this time, this function can only be used in predicates. The placeholder
+ * is here to prevent calcite from complaining; the function will get pushed down
+ * by the storage plug-in into DB. That process will go through JsonConditionBuilder.java,
+ * which will replace this function with the real OJAI equivalent to be pushed down.
+ * Therefore, there's no implementation here.
+ */
+@FunctionTemplate(
+ name="ojai_typeof",
+ scope=FunctionTemplate.FunctionScope.SIMPLE,
+ nulls=FunctionTemplate.NullHandling.INTERNAL)
+public class TypeOfPlaceholder implements DrillSimpleFunc {
+
+ @Param BigIntHolder /*FieldReader*/ field;
+ @Param(constant = true) IntHolder typeCode;
+
+ @Output BitHolder output;
+
+ public void setup() {
+ }
+
+ public void eval() {
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/resources/drill-module.conf b/contrib/format-maprdb/src/main/resources/drill-module.conf
index 8d42355..a4270c7 100644
--- a/contrib/format-maprdb/src/main/resources/drill-module.conf
+++ b/contrib/format-maprdb/src/main/resources/drill-module.conf
@@ -17,4 +17,22 @@
// This file can also include any supplementary configuration information.
// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
-# This file currently does not contain any configuration
\ No newline at end of file
+format-maprdb: {
+ json: {
+ mediaType: SSD,
+ scanSizeMB: 128,
+ restrictedScanSizeMB: 4096,
+ useNumRegionsForDistribution: false,
+ tableCache: {
+ enabled: true,
+ size: 1000,
+ expireTimeInMinutes: 60
+ },
+ pluginCost: {
+ blockSize: 8192
+ }
+ }
+}
+
+// this picks up the UDFs
+drill.classpath.scanning.packages += "org.apache.drill.exec.udf.mapr.db"
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
index 0c81a18..edd5ab4 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
@@ -17,12 +17,12 @@
*/
package com.mapr.drill.maprdb.tests;
-import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.hbase.HBaseTestsSuite;
import org.apache.hadoop.conf.Configuration;
@@ -31,25 +31,23 @@ import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
-import org.ojai.Document;
-import org.ojai.DocumentStream;
-import org.ojai.json.Json;
import com.mapr.db.Admin;
import com.mapr.db.MapRDB;
-import com.mapr.db.Table;
import com.mapr.drill.maprdb.tests.binary.TestMapRDBFilterPushDown;
import com.mapr.drill.maprdb.tests.binary.TestMapRDBSimple;
+import com.mapr.drill.maprdb.tests.json.TestScanRanges;
import com.mapr.drill.maprdb.tests.json.TestSimpleJson;
@RunWith(Suite.class)
@SuiteClasses({
TestMapRDBSimple.class,
TestMapRDBFilterPushDown.class,
- TestSimpleJson.class
+ TestSimpleJson.class,
+ TestScanRanges.class
})
public class MaprDBTestsSuite {
- private static final String TMP_BUSINESS_TABLE = "/tmp/business";
+ public static final int INDEX_FLUSH_TIMEOUT = 60000;
private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
@@ -65,13 +63,13 @@ public class MaprDBTestsSuite {
if (initCount.get() == 0) {
HBaseTestsSuite.configure(false /*manageHBaseCluster*/, true /*createTables*/);
HBaseTestsSuite.initCluster();
- createJsonTables();
// Sleep to allow table data to be flushed to tables.
// Without this, the row count stats to return 0,
// causing the planner to reject optimized plans.
Thread.sleep(5000);
+ admin = MapRDB.newAdmin();
conf = HBaseTestsSuite.getConf();
initCount.incrementAndGet(); // must increment while inside the synchronized block
return;
@@ -87,17 +85,19 @@ public class MaprDBTestsSuite {
synchronized (MaprDBTestsSuite.class) {
if (initCount.decrementAndGet() == 0) {
HBaseTestsSuite.tearDownCluster();
- deleteJsonTables();
+ admin.close();
}
}
}
- private static volatile boolean pluginCreated;
+ private static volatile boolean pluginsUpdated;
public static Configuration createPluginAndGetConf(DrillbitContext ctx) throws Exception {
- if (!pluginCreated) {
+ if (!pluginsUpdated) {
synchronized (MaprDBTestsSuite.class) {
- if (!pluginCreated) {
+ if (!pluginsUpdated) {
+ StoragePluginRegistry pluginRegistry = ctx.getStorage();
+
String pluginConfStr = "{" +
" \"type\": \"file\"," +
" \"enabled\": true," +
@@ -108,6 +108,11 @@ public class MaprDBTestsSuite {
" \"writable\": false," +
" \"defaultInputFormat\": \"maprdb\"" +
" }," +
+ " \"tmp\": {" +
+ " \"location\": \"/tmp\"," +
+ " \"writable\": true," +
+ " \"defaultInputFormat\": \"parquet\"" +
+ " }," +
" \"root\": {" +
" \"location\": \"/\"," +
" \"writable\": false," +
@@ -121,6 +126,9 @@ public class MaprDBTestsSuite {
" \"readAllNumbersAsDouble\": false," +
" \"enablePushdown\": true" +
" }," +
+ " \"parquet\": {" +
+ " \"type\": \"parquet\"" +
+ " }," +
" \"streams\": {" +
" \"type\": \"streams\"" +
" }" +
@@ -129,7 +137,7 @@ public class MaprDBTestsSuite {
FileSystemConfig pluginConfig = ctx.getLpPersistence().getMapper().readValue(pluginConfStr, FileSystemConfig.class);
// create the plugin with "hbase" name so that we can run HBase unit tests against them
- ctx.getStorage().createOrUpdate("hbase", pluginConfig, true);
+ pluginRegistry.createOrUpdate("hbase", pluginConfig, true);
}
}
}
@@ -140,33 +148,12 @@ public class MaprDBTestsSuite {
return IS_DEBUG;
}
- public static InputStream getJsonStream(String resourceName) {
- return MaprDBTestsSuite.class.getClassLoader().getResourceAsStream(resourceName);
- }
-
- public static void createJsonTables() throws IOException {
- admin = MapRDB.newAdmin();
- if (admin.tableExists(TMP_BUSINESS_TABLE)) {
- admin.deleteTable(TMP_BUSINESS_TABLE);
- }
-
- try (Table table = admin.createTable(TMP_BUSINESS_TABLE);
- InputStream in = getJsonStream("json/business.json");
- DocumentStream stream = Json.newDocumentStream(in)) {
- for (Document document : stream) {
- table.insert(document, "business_id");
- }
- table.flush();
- }
+ public static Admin getAdmin() {
+ return admin;
}
- public static void deleteJsonTables() {
- if (admin != null) {
- if (admin.tableExists(TMP_BUSINESS_TABLE)) {
- admin.deleteTable(TMP_BUSINESS_TABLE);
- }
- admin.close();
- }
+ public static InputStream getJsonStream(String resourceName) {
+ return MaprDBTestsSuite.class.getResourceAsStream(resourceName);
}
}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
index 550fb73..2d6ae06 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
@@ -29,6 +29,19 @@ import org.junit.BeforeClass;
import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
public class BaseJsonTest extends BaseTestQuery {
+ protected static final String SCHEMA = "hbase.root";
+
+ protected String format(final String sql) {
+ return String.format(sql, SCHEMA, getTablePath());
+ }
+
+ protected String getTablePath() {
+ throw new RuntimeException("unimplemented");
+ }
+
+ public static String format(final String sql, final String tablePath) {
+ return String.format(sql, SCHEMA, tablePath);
+ }
@BeforeClass
public static void setupDefaultTestCluster() throws Exception {
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestEncodedFieldPaths.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestEncodedFieldPaths.java
new file mode 100644
index 0000000..dcbc440
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestEncodedFieldPaths.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.json;
+
+import static com.mapr.drill.maprdb.tests.MaprDBTestsSuite.INDEX_FLUSH_TIMEOUT;
+
+import java.io.InputStream;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.util.EncodedSchemaPathSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.ojai.DocumentStream;
+import org.ojai.json.Json;
+
+import com.google.common.collect.ImmutableMap;
+import com.mapr.db.Table;
+import com.mapr.db.tests.utils.DBTests;
+
+public class TestEncodedFieldPaths extends BaseJsonTest {
+
+ private static final String TABLE_NAME = "encoded_fields_userdata_table";
+ private static final String INDEX_NAME = "encoded_fields_userdata_index";
+ private static final String JSON_FILE_URL = "/com/mapr/drill/json/encoded_fields_userdata.json";
+
+ private static boolean tableCreated = false;
+ private static String tablePath;
+
+ @BeforeClass
+ public static void setup_TestEncodedFieldPaths() throws Exception {
+ try (Table table = DBTests.createOrReplaceTable(TABLE_NAME, ImmutableMap.of("codes", "codes"))) {
+ tableCreated = true;
+ tablePath = table.getPath().toUri().getPath();
+
+ DBTests.createIndex(TABLE_NAME, INDEX_NAME, new String[] {"age"}, new String[] {"name.last", "data.salary"});
+ DBTests.admin().getTableIndexes(table.getPath(), true);
+
+ try (final InputStream in = TestEncodedFieldPaths.class.getResourceAsStream(JSON_FILE_URL);
+ final DocumentStream stream = Json.newDocumentStream(in);) {
+ table.insertOrReplace(stream);
+ table.flush();
+ }
+
+ // wait for the indexes to sync
+ DBTests.waitForRowCount(table.getPath(), 5, INDEX_FLUSH_TIMEOUT);
+ DBTests.waitForIndexFlush(table.getPath(), INDEX_FLUSH_TIMEOUT);
+ } finally {
+ test("ALTER SESSION SET `planner.disable_full_table_scan` = true");
+ }
+ }
+
+ @AfterClass
+ public static void cleanup_TestEncodedFieldPaths() throws Exception {
+ test("ALTER SESSION SET `planner.disable_full_table_scan` = false");
+ if (tableCreated) {
+ DBTests.deleteTables(TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void test_encoded_fields_with_non_covering_index() throws Exception {
+ final String sql = String.format(
+ "SELECT\n"
+ + " t.`%s`,t.`$$document`\n"
+ + "FROM\n"
+ + " hbase.root.`%s` t\n"
+ + "WHERE (t.`age` > 20)\n"
+ + "ORDER BY t.`_id` ASC",
+ EncodedSchemaPathSet.encode("_id", "codes")[0],
+ tablePath);
+
+ setColumnWidths(new int[] {20, 60});
+ runSQLAndVerifyCount(sql, 3);
+
+
+ // plan test
+ final String[] expectedPlan = {"JsonTableGroupScan.*indexName=encoded_fields_userdata_index.*" + // scan on index
+ "columns=\\[`_id`, `age`\\]",
+ "RestrictedJsonTableGroupScan.*" + // restricted scan on the table with encoded name
+ "columns=\\[`age`, `\\$\\$ENC00L5UWIADDN5SGK4Y`, `\\$\\$document`, `_id`\\]",
+ "RowKeyJoin"}; // join on row_key
+ final String[] excludedPlan = {};
+
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+ }
+
+ @Test
+ public void test_encoded_fields_with_covering_index() throws Exception {
+ final String sql = String.format(
+ "SELECT\n"
+ + " t.`%s`,t.`$$document`\n"
+ + "FROM\n"
+ + " hbase.root.`%s` t\n"
+ + "WHERE (t.`age` > 10)\n"
+ + "ORDER BY t.`_id` ASC",
+ EncodedSchemaPathSet.encode("name.last", "data.salary")[0],
+ tablePath);
+
+ setColumnWidths(new int[] {20, 60});
+ runSQLAndVerifyCount(sql, 4);
+
+
+ // plan test
+ final String[] expectedPlan = {"JsonTableGroupScan.*indexName=encoded_fields_userdata_index.*", // scan on index
+ "columns=\\[`age`, `\\$\\$ENC00NZQW2ZJONRQXG5AAMRQXIYJOONQWYYLSPE`, `\\$\\$document`, `_id`\\]"};
+ final String[] excludedPlan = {"RestrictedJsonTableGroupScan", // restricted scan on the table
+ "RowKeyJoin"}; // join on row_key
+
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+ }
+
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestFieldPathHelper.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestFieldPathHelper.java
new file mode 100644
index 0000000..914ad96
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestFieldPathHelper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.json;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.mapr.db.json.FieldPathHelper;
+import org.junit.Test;
+import org.ojai.FieldPath;
+
+public class TestFieldPathHelper {
+
+ @Test
+ public void simeTests() {
+ String[] pathStrs = {"a", "a.b", "a.b.c", "a[1].b[2].c", "a[0][1][2][3].b"};
+ FieldPath[] fieldPaths = new FieldPath[pathStrs.length];
+ SchemaPath[] schemaPaths = new SchemaPath[pathStrs.length];
+
+ // build
+ for (int i = 0; i < pathStrs.length; i++) {
+ String path = pathStrs[i];
+ fieldPaths[i] = FieldPath.parseFrom(path);
+ schemaPaths[i] = SchemaPath.parseFromString(path);
+ }
+
+ //verify
+ for (int i = 0; i < pathStrs.length; i++) {
+ FieldPath fp = FieldPathHelper.schemaPath2FieldPath(schemaPaths[i]);
+ assertEquals(fieldPaths[i], fp);
+
+ SchemaPath sp = FieldPathHelper.fieldPath2SchemaPath(fieldPaths[i]);
+ assertEquals(schemaPaths[i], sp);
+ }
+ }
+
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java
new file mode 100644
index 0000000..3ec27d9
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.work.foreman.QueryManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.ojai.Document;
+import org.ojai.DocumentStream;
+import org.ojai.json.Json;
+
+import com.google.common.collect.Lists;
+import com.mapr.db.Table;
+import com.mapr.db.tests.utils.DBTests;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
+import com.mapr.tests.annotations.ClusterTest;
+
+@Category(ClusterTest.class)
+public class TestScanRanges extends BaseJsonTest {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestScanRanges.class);
+
+ private static final int TOTAL_ROW_COUNT = 1000000;
+ private static final String TABLE_NAME = "large_table_TestScanRanges";
+ private static final String JSON_FILE_URL = "/com/mapr/drill/json/business.json";
+
+ private static boolean tableCreated = false;
+ private static String tablePath;
+ protected String getTablePath() {
+ return tablePath;
+ }
+
+ @BeforeClass
+ public static void setup_TestSimpleJson() throws Exception {
+ // We create a large table with auto-split set to disabled.
+ // Without intra-tablet partitioning, this test should run with only one minor fragment
+ try (Table table = DBTests.createOrReplaceTable(TABLE_NAME, false /*autoSplit*/);
+ InputStream in = MaprDBTestsSuite.getJsonStream(JSON_FILE_URL);
+ DocumentStream stream = Json.newDocumentStream(in)) {
+ tableCreated = true;
+ tablePath = table.getPath().toUri().getPath();
+
+ List<Document> docs = Lists.newArrayList(stream);
+ for (char ch = 'A'; ch <= 'T'; ch++) {
+ for (int rowIndex = 0; rowIndex < 5000; rowIndex++) {
+ for (int i = 0; i < docs.size(); i++) {
+ final Document document = docs.get(i);
+ final String id = String.format("%c%010d%03d", ch, rowIndex, i);
+ document.set("documentId", rowIndex);
+ table.insertOrReplace(id, document);
+ }
+ }
+ }
+ table.flush();
+ DBTests.waitForRowCount(table.getPath(), TOTAL_ROW_COUNT);
+
+ setSessionOption("planner.width.max_per_node", "5");
+ }
+ }
+
+ @AfterClass
+ public static void cleanup_TestEncodedFieldPaths() throws Exception {
+ if (tableCreated) {
+ DBTests.deleteTables(TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void test_scan_ranges() throws Exception {
+ final PersistentStore<UserBitShared.QueryProfile> completed = getDrillbitContext().getProfileStoreContext().getCompletedProfileStore();
+
+ setColumnWidths(new int[] {25, 40, 25, 45});
+ final String sql = format("SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " %s.`%s` business");
+
+ final SilentListener resultListener = new SilentListener();
+ final AwaitableUserResultsListener listener = new AwaitableUserResultsListener(resultListener);
+ testWithListener(QueryType.SQL, sql, listener);
+ listener.await();
+
+ assertEquals(TOTAL_ROW_COUNT, resultListener.getRowCount());
+ String queryId = QueryIdHelper.getQueryId(resultListener.getQueryId());
+
+ QueryProfile profile = completed.get(queryId);
+ String profileString = String.valueOf(profile);
+ logger.debug(profileString);
+ assertNotNull(profile);
+ assertTrue(profile.getTotalFragments() >= 5); // should at least as many as
+ }
+
+ @Test
+ public void test_scan_ranges_with_filter_on_id() throws Exception {
+ setColumnWidths(new int[] {25, 25, 25});
+ final String sql = format("SELECT\n"
+ + " _id, business_id, city\n"
+ + "FROM\n"
+ + " %s.`%s` business\n"
+ + "WHERE\n"
+ + " _id > 'M' AND _id < 'Q'");
+
+ final SilentListener resultListener = new SilentListener();
+ final AwaitableUserResultsListener listener = new AwaitableUserResultsListener(resultListener);
+ testWithListener(QueryType.SQL, sql, listener);
+ listener.await();
+
+ assertEquals(200000, resultListener.getRowCount());
+ }
+
+ @Test
+ public void test_scan_ranges_with_filter_on_non_id_field() throws Exception {
+ setColumnWidths(new int[] {25, 25, 25});
+ final String sql = format("SELECT\n"
+ + " _id, business_id, documentId\n"
+ + "FROM\n"
+ + " %s.`%s` business\n"
+ + "WHERE\n"
+ + " documentId >= 100 AND documentId < 150");
+
+ final SilentListener resultListener = new SilentListener();
+ final AwaitableUserResultsListener listener = new AwaitableUserResultsListener(resultListener);
+ testWithListener(QueryType.SQL, sql, listener);
+ listener.await();
+
+ assertEquals(10000, resultListener.getRowCount());
+ }
+
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index 26f54b8..4dbff5d 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.io.InputStream;
+
import org.apache.drill.PlanTestBase;
import org.apache.drill.SingleRowListener;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -28,64 +30,102 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.util.VectorUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
+import org.ojai.DocumentStream;
+import org.ojai.json.Json;
-import com.mapr.db.MapRDB;
+import com.mapr.db.Table;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.tests.utils.DBTests;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
import com.mapr.tests.annotations.ClusterTest;
@Category(ClusterTest.class)
public class TestSimpleJson extends BaseJsonTest {
+ private static final String TABLE_NAME = "business";
+ private static final String JSON_FILE_URL = "/com/mapr/drill/json/business.json";
+
+ private static boolean tableCreated = false;
+ private static String tablePath;
+ protected String getTablePath() {
+ return tablePath;
+ }
+
+ @BeforeClass
+ public static void setup_TestSimpleJson() throws Exception {
+ try (Table table = DBTests.createOrReplaceTable(TABLE_NAME);
+ InputStream in = MaprDBTestsSuite.getJsonStream(JSON_FILE_URL);
+ DocumentStream stream = Json.newDocumentStream(in)) {
+ tableCreated = true;
+ tablePath = table.getPath().toUri().getPath();
+
+ for (Document document : stream) {
+ table.insert(document, "business_id");
+ }
+ table.flush();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup_TestEncodedFieldPaths() throws Exception {
+ if (tableCreated) {
+ DBTests.deleteTables(TABLE_NAME);
+ }
+ }
+
@Test
public void testSelectStar() throws Exception {
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " *\n"
+ "FROM\n"
- + " hbase.`business` business";
+ + " %s.`%s` business");
runSQLAndVerifyCount(sql, 10);
}
@Test
public void testSelectId() throws Exception {
setColumnWidths(new int[] {23});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id\n"
+ "FROM\n"
- + " hbase.`business` business";
+ + " %s.`%s` business");
runSQLAndVerifyCount(sql, 10);
}
@Test
public void testSelectNonExistentColumns() throws Exception {
setColumnWidths(new int[] {23});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " something\n"
+ "FROM\n"
- + " hbase.business business limit 5";
+ + " %s.`%s` business limit 5");
runSQLAndVerifyCount(sql, 5);
}
@Test
public void testKVGen() throws Exception {
setColumnWidths(new int[] {21, 10, 6});
- final String sql = "select _id, t.parking[0].`key` K, t.parking[0].`value` V from"
- + " (select _id, kvgen(b.attributes.Parking) as parking from hbase.business b)"
- + " as t where t.parking[0].`key` = 'garage' AND t.parking[0].`value` = true";
+ final String sql = format("select _id, t.parking[0].`key` K, t.parking[0].`value` V from"
+ + " (select _id, kvgen(b.attributes.Parking) as parking from %s.`%s` b)"
+ + " as t where t.parking[0].`key` = 'garage' AND t.parking[0].`value` = true");
runSQLAndVerifyCount(sql, 1);
}
@Test
public void testPushdownDisabled() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " table(hbase.`business`(type => 'maprdb', enablePushdown => false)) business\n"
+ + " table(%s.`%s`(type => 'maprdb', enablePushdown => false)) business\n"
+ "WHERE\n"
- + " name <> 'Sprint'"
- ;
+ + " name <> 'Sprint'");
runSQLAndVerifyCount(sql, 9);
final String[] expectedPlan = {"condition=null", "columns=\\[`\\*`\\]"};
@@ -97,15 +137,14 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushdownStringEqual() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, business.hours.Monday.`open`, categories[1], years[2], full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " name = 'Sprint'"
- ;
+ + " name = 'Sprint'");
- final Document queryResult = MapRDB.newDocument();
+ final Document queryResult = MapRDBImpl.newDocument();
SingleRowListener listener = new SingleRowListener() {
@Override
protected void rowArrived(QueryDataBatch result) {
@@ -141,13 +180,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushdownStringLike() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " name LIKE 'S%'"
- ;
+ + " name LIKE 'S%%'");
runSQLAndVerifyCount(sql, 3);
final String[] expectedPlan = {"condition=\\(name MATCHES \"\\^\\\\\\\\QS\\\\\\\\E\\.\\*\\$\"\\)"};
@@ -159,13 +197,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushdownStringNotEqual() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " name <> 'Sprint'"
- ;
+ + " name <> 'Sprint'");
runSQLAndVerifyCount(sql, 9);
final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"};
@@ -177,13 +214,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushdownLongEqual() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " zip = 85260"
- ;
+ + " zip = 85260");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(zip = \\{\"\\$numberLong\":85260\\}\\)"};
@@ -195,15 +231,14 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testCompositePredicate() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
+ " zip = 85260\n"
+ " OR\n"
- + " city = 'Las Vegas'"
- ;
+ + " city = 'Las Vegas'");
runSQLAndVerifyCount(sql, 4);
final String[] expectedPlan = {"condition=\\(\\(zip = \\{\"\\$numberLong\":85260\\}\\) or \\(city = \"Las Vegas\"\\)\\)"};
@@ -215,13 +250,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPruneScanRange() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " _id = 'jFTZmywe7StuZ2hEjxyA'"
- ;
+ + " _id = 'jFTZmywe7StuZ2hEjxyA'");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\)"};
@@ -231,17 +265,17 @@ public class TestSimpleJson extends BaseJsonTest {
}
@Test
+ @Ignore("Bug 27981")
public void testPruneScanRangeAndPushDownCondition() throws Exception {
// XXX/TODO:
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, categories, full_address\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
+ " _id = 'jFTZmywe7StuZ2hEjxyA' AND\n"
- + " name = 'Subway'"
- ;
+ + " name = 'Subway'");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\) and \\(name = \"Subway\"\\)\\)"};
@@ -253,12 +287,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownOnSubField1() throws Exception {
setColumnWidths(new int[] {25, 120, 20});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, b.attributes.Ambience.touristy attributes\n"
+ "FROM\n"
- + " hbase.`business` b\n"
+ + " %s.`%s` b\n"
+ "WHERE\n"
- + " b.attributes.Ambience.casual = false";
+ + " b.attributes.Ambience.casual = false");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = false\\)"};
@@ -270,13 +304,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownOnSubField2() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, b.attributes.Attire attributes\n"
+ "FROM\n"
- + " hbase.`business` b\n"
+ + " %s.`%s` b\n"
+ "WHERE\n"
- + " b.attributes.Attire = 'casual'"
- ;
+ + " b.attributes.Attire = 'casual'");
runSQLAndVerifyCount(sql, 4);
final String[] expectedPlan = {"condition=\\(attributes.Attire = \"casual\"\\)"};
@@ -288,13 +321,12 @@ public class TestSimpleJson extends BaseJsonTest {
public void testPushDownIsNull() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, attributes\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " business.attributes.Ambience.casual IS NULL"
- ;
+ + " business.attributes.Ambience.casual IS NULL");
runSQLAndVerifyCount(sql, 7);
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = null\\)"};
@@ -307,13 +339,12 @@ public class TestSimpleJson extends BaseJsonTest {
public void testPushDownIsNotNull() throws Exception {
setColumnWidths(new int[] {25, 75, 75, 50});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, b.attributes.Parking\n"
+ "FROM\n"
- + " hbase.`business` b\n"
+ + " %s.`%s` b\n"
+ "WHERE\n"
- + " b.attributes.Ambience.casual IS NOT NULL"
- ;
+ + " b.attributes.Ambience.casual IS NOT NULL");
runSQLAndVerifyCount(sql, 3);
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual != null\\)"};
@@ -325,13 +356,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownOnSubField3() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, b.attributes.`Accepts Credit Cards` attributes\n"
+ "FROM\n"
- + " hbase.`business` b\n"
+ + " %s.`%s` b\n"
+ "WHERE\n"
- + " b.attributes.`Accepts Credit Cards` IS NULL"
- ;
+ + " b.attributes.`Accepts Credit Cards` IS NULL");
runSQLAndVerifyCount(sql, 3);
final String[] expectedPlan = {"condition=\\(attributes.Accepts Credit Cards = null\\)"};
@@ -342,13 +372,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownLong() throws Exception {
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " *\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " stars > 4.0"
- ;
+ + " stars > 4.0");
runSQLAndVerifyCount(sql, 2);
final String[] expectedPlan = {"condition=\\(stars > 4\\)"};
@@ -359,14 +388,13 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownSubField4() throws Exception {
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " *\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
+ " business.attributes.`Good For`.lunch = true AND"
- + " stars > 4.1"
- ;
+ + " stars > 4.1");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(\\(attributes.Good For.lunch = true\\) and \\(stars > 4.1\\)\\)"};
@@ -378,13 +406,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownSubField5() throws Exception {
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " *\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " business.hours.Tuesday.`open` < TIME '10:30:00'"
- ;
+ + " business.hours.Tuesday.`open` < TIME '10:30:00'");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(hours.Tuesday.open < \\{\"\\$time\":\"10:30:00\"\\}\\)"};
@@ -395,13 +422,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownSubField6() throws Exception {
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " *\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " business.hours.Sunday.`close` > TIME '20:30:00'"
- ;
+ + " business.hours.Sunday.`close` > TIME '20:30:00'");
runSQLAndVerifyCount(sql, 3);
final String[] expectedPlan = {"condition=\\(hours.Sunday.close > \\{\"\\$time\":\"20:30:00\"\\}\\)"};
@@ -413,13 +439,12 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownSubField7() throws Exception {
setColumnWidths(new int[] {25, 40, 25, 45});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, start_date, last_update\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " business.`start_date` = DATE '2012-07-14'"
- ;
+ + " business.`start_date` = DATE '2012-07-14'");
runSQLAndVerifyCount(sql, 1);
final String[] expectedPlan = {"condition=\\(start_date = \\{\"\\$dateDay\":\"2012-07-14\"\\}\\)"};
@@ -431,19 +456,34 @@ public class TestSimpleJson extends BaseJsonTest {
@Test
public void testPushDownSubField8() throws Exception {
setColumnWidths(new int[] {25, 40, 25, 45});
- final String sql = "SELECT\n"
+ final String sql = format("SELECT\n"
+ " _id, name, start_date, last_update\n"
+ "FROM\n"
- + " hbase.`business` business\n"
+ + " %s.`%s` business\n"
+ "WHERE\n"
- + " business.`last_update` = TIMESTAMP '2012-10-20 07:42:46'"
- ;
+ + " business.`last_update` = TIMESTAMP '2012-10-20 07:42:46'");
runSQLAndVerifyCount(sql, 1);
- final String[] expectedPlan = {"condition=null"};
- final String[] excludedPlan = {"condition=\\(last_update = \\{\"\\$date\":\"2012-10-20T07:42:46.000Z\"\\}\\)"};
+ final String[] expectedPlan = {"condition=\\(last_update = \\{\"\\$date\":\"2012-10-20T07:42:46.000Z\"\\}\\)"};
+ final String[] excludedPlan = {};
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
+ @Test
+ public void testLimit() throws Exception {
+ final String sql = format("SELECT\n"
+ + " _id, name, start_date, last_update\n"
+ + "FROM\n"
+ + " %s.`%s` business\n"
+ + "limit 1"
+ );
+
+ final String[] expectedPlan = {"JsonTableGroupScan.*limit=1"};
+ final String[] excludedPlan = {};
+
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+ runSQLAndVerifyCount(sql, 1);
+ }
+
}
diff --git a/contrib/format-maprdb/src/test/resources/json/business.json b/contrib/format-maprdb/src/test/resources/com/mapr/drill/json/business.json
similarity index 100%
rename from contrib/format-maprdb/src/test/resources/json/business.json
rename to contrib/format-maprdb/src/test/resources/com/mapr/drill/json/business.json
diff --git a/contrib/format-maprdb/src/test/resources/com/mapr/drill/json/encoded_fields_userdata.json b/contrib/format-maprdb/src/test/resources/com/mapr/drill/json/encoded_fields_userdata.json
new file mode 100644
index 0000000..3295ba5
--- /dev/null
+++ b/contrib/format-maprdb/src/test/resources/com/mapr/drill/json/encoded_fields_userdata.json
@@ -0,0 +1,5 @@
+{"_id":"user001", "age":43, "name": {"first":"Sam", "last":"Harris"}, "codes": [1, "x@#ss2", 9.0], "data": {"salary": {"$numberLong": 125000}}}
+{"_id":"user002", "age":12, "name": {"first":"Leon", "last":"Russel"}, "codes": ["JA32S"], "data": {"salary": "170200"}}
+{"_id":"user003", "age":"87", "name": {"first":"David", "last":"Bowie"}, "codes": [236.35], "data": {"salary": {"$numberLong": 185200}}}
+{"_id":"user004", "age":56, "name": {"first":"Bob", "last":"Dylan"}, "codes": [{"$date": "1989-07-4"}, 0], "data": {"salary": {"$numberLong": 136900}}}
+{"_id":"user005", "age":54, "name": {"first":"David", "last":"Ackert"}, "codes": [{"a": 25, "b": "yas"}], "data": {"salary": {"$numberLong": 112850}}}
diff --git a/contrib/format-maprdb/src/test/resources/hbase-site.xml b/contrib/format-maprdb/src/test/resources/core-site.xml
similarity index 90%
rename from contrib/format-maprdb/src/test/resources/hbase-site.xml
rename to contrib/format-maprdb/src/test/resources/core-site.xml
index ec26f70..7ea9704 100644
--- a/contrib/format-maprdb/src/test/resources/hbase-site.xml
+++ b/contrib/format-maprdb/src/test/resources/core-site.xml
@@ -25,4 +25,9 @@
<value>*:/tmp/</value>
</property>
+ <property>
+ <name>fs.mapr.bailout.on.library.mismatch</name>
+ <value>false</value>
+ </property>
+
</configuration>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cddf3fe..a4eb369 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -72,6 +72,12 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
@Override
@JsonIgnore
+ public boolean isDistributed() {
+ return getMaxParallelizationWidth() > 1 ? true : false;
+ }
+
+ @Override
+ @JsonIgnore
public int getMinParallelizationWidth() {
return 1;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 655e3a9..263ef05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -55,6 +55,9 @@ public interface GroupScan extends Scan, HasAffinity{
@JsonIgnore
int getMaxParallelizationWidth();
+ @JsonIgnore
+ boolean isDistributed();
+
/**
* At minimum, the GroupScan requires these many fragments to run.
* Currently, this is used in {@link org.apache.drill.exec.planner.fragment.SimpleParallelizer}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index c474f2e..e1cc661 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.planner.physical;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import com.carrotsearch.hppc.IntIntHashMap;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
@@ -27,18 +30,28 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.ArraySegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
public class PrelUtil {
@@ -94,6 +107,121 @@ public class PrelUtil {
return lastUsed.getLastUsedReference();
}
+ public static ProjectPushInfo getColumns(RelDataType rowType, List<RexNode> projects) {
+ final List<String> fieldNames = rowType.getFieldNames();
+ if (fieldNames.isEmpty()) {
+ return null;
+ }
+
+ RefFieldsVisitor v = new RefFieldsVisitor(rowType);
+ for (RexNode exp : projects) {
+ PathSegment segment = exp.accept(v);
+ v.addColumn(segment);
+ }
+
+ return v.getInfo();
+
+ }
+
+ public static class DesiredField {
+ public final int origIndex;
+ public final String name;
+ public final RelDataTypeField field;
+
+ public DesiredField(int origIndex, String name, RelDataTypeField field) {
+ super();
+ this.origIndex = origIndex;
+ this.name = name;
+ this.field = field;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((field == null) ? 0 : field.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + origIndex;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ DesiredField other = (DesiredField) obj;
+ if (field == null) {
+ if (other.field != null) {
+ return false;
+ }
+ } else if (!field.equals(other.field)) {
+ return false;
+ }
+ if (name == null) {
+ if (other.name != null) {
+ return false;
+ }
+ } else if (!name.equals(other.name)) {
+ return false;
+ }
+ if (origIndex != other.origIndex) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ public static class ProjectPushInfo {
+ public final List<SchemaPath> columns;
+ public final List<DesiredField> desiredFields;
+ public final InputRewriter rewriter;
+ private final List<String> fieldNames;
+ private final List<RelDataType> types;
+
+ public ProjectPushInfo(List<SchemaPath> columns, ImmutableList<DesiredField> desiredFields) {
+ super();
+ this.columns = columns;
+ this.desiredFields = desiredFields;
+
+ this.fieldNames = Lists.newArrayListWithCapacity(desiredFields.size());
+ this.types = Lists.newArrayListWithCapacity(desiredFields.size());
+ IntIntHashMap oldToNewIds = new IntIntHashMap();
+
+ int i =0;
+ for (DesiredField f : desiredFields) {
+ fieldNames.add(f.name);
+ types.add(f.field.getType());
+ oldToNewIds.put(f.origIndex, i);
+ i++;
+ }
+ this.rewriter = new InputRewriter(oldToNewIds);
+ }
+
+ public InputRewriter getInputRewriter() {
+ return rewriter;
+ }
+
+ public boolean isStarQuery() {
+ for (SchemaPath column : columns) {
+ if (column.getRootSegment().getPath().startsWith("*")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public RelDataType createNewRowType(RelDataTypeFactory factory) {
+ return factory.createStructType(types, fieldNames);
+ }
+ }
// Simple visitor class to determine the last used reference in the expression
private static class LastUsedRefVisitor extends RexVisitorImpl<Void> {
@@ -123,6 +251,69 @@ public class PrelUtil {
}
}
+ /** Visitor that finds the set of inputs that are used. */
+ private static class RefFieldsVisitor extends RexVisitorImpl<PathSegment> {
+ final Set<SchemaPath> columns = Sets.newLinkedHashSet();
+ final private List<String> fieldNames;
+ final private List<RelDataTypeField> fields;
+ final private Set<DesiredField> desiredFields = Sets.newLinkedHashSet();
+
+ public RefFieldsVisitor(RelDataType rowType) {
+ super(true);
+ this.fieldNames = rowType.getFieldNames();
+ this.fields = rowType.getFieldList();
+ }
+
+ public void addColumn(PathSegment segment) {
+ if (segment != null && segment instanceof NameSegment) {
+ columns.add(new SchemaPath((NameSegment)segment));
+ }
+ }
+
+ public ProjectPushInfo getInfo() {
+ return new ProjectPushInfo(ImmutableList.copyOf(columns), ImmutableList.copyOf(desiredFields));
+ }
+
+ @Override
+ public PathSegment visitInputRef(RexInputRef inputRef) {
+ int index = inputRef.getIndex();
+ String name = fieldNames.get(index);
+ RelDataTypeField field = fields.get(index);
+ DesiredField f = new DesiredField(index, name, field);
+ desiredFields.add(f);
+ return new NameSegment(name);
+ }
+
+ @Override
+ public PathSegment visitCall(RexCall call) {
+ if ("ITEM".equals(call.getOperator().getName())) {
+ PathSegment mapOrArray = call.operands.get(0).accept(this);
+ if (mapOrArray != null) {
+ if (call.operands.get(1) instanceof RexLiteral) {
+ return mapOrArray.cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1)));
+ }
+ return mapOrArray;
+ }
+ } else {
+ for (RexNode operand : call.operands) {
+ addColumn(operand.accept(this));
+ }
+ }
+ return null;
+ }
+
+ private PathSegment convertLiteral(RexLiteral literal) {
+ switch (literal.getType().getSqlTypeName()) {
+ case CHAR:
+ return new NameSegment(RexLiteral.stringValue(literal));
+ case INTEGER:
+ return new ArraySegment(RexLiteral.intValue(literal));
+ default:
+ return null;
+ }
+ }
+
+ }
public static RelTraitSet fixTraits(RelOptRuleCall call, RelTraitSet set) {
return fixTraits(call.getPlanner(), set);
@@ -148,4 +339,44 @@ public class PrelUtil {
return newTraitSet;
}
+
+ public static class InputRefRemap {
+ private int oldIndex;
+ private int newIndex;
+
+ public InputRefRemap(int oldIndex, int newIndex) {
+ super();
+ this.oldIndex = oldIndex;
+ this.newIndex = newIndex;
+ }
+ public int getOldIndex() {
+ return oldIndex;
+ }
+ public int getNewIndex() {
+ return newIndex;
+ }
+
+ }
+
+ public static class InputRewriter extends RexShuttle {
+
+ final IntIntHashMap map;
+
+ public InputRewriter(IntIntHashMap map) {
+ super();
+ this.map = map;
+ }
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ return new RexInputRef(map.get(inputRef.getIndex()), inputRef.getType());
+ }
+
+ @Override
+ public RexNode visitLocalRef(RexLocalRef localRef) {
+ return new RexInputRef(map.get(localRef.getIndex()), localRef.getType());
+ }
+
+ }
+
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index e85b2c7..71aa240 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -480,6 +480,7 @@ public class BaseTestQuery extends ExecTest {
public static class SilentListener implements UserResultsListener {
private final AtomicInteger count = new AtomicInteger();
+ private QueryId queryId;
@Override
public void submissionFailed(UserException ex) {
@@ -501,8 +502,17 @@ public class BaseTestQuery extends ExecTest {
}
@Override
- public void queryIdArrived(QueryId queryId) {}
+ public void queryIdArrived(QueryId queryId) {
+ this.queryId = queryId;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+ public int getRowCount() {
+ return count.get();
+ }
}
protected void setColumnWidth(int columnWidth) {
diff --git a/pom.xml b/pom.xml
index dbd8caf..a75e251 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,8 +53,8 @@
<jackson.version>2.9.5</jackson.version>
<jackson.databind.version>2.9.5</jackson.databind.version>
<zookeeper.version>3.4.12</zookeeper.version>
- <mapr.release.version>5.2.1-mapr</mapr.release.version>
- <ojai.version>1.1</ojai.version>
+ <mapr.release.version>6.0.1-mapr</mapr.release.version>
+ <ojai.version>2.0.1-mapr-1804</ojai.version>
<kerby.version>1.0.0-RC2</kerby.version>
<findbugs.version>3.0.0</findbugs.version>
<netty.tcnative.classifier />