You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:03 UTC
[16/50] [abbrv] drill git commit: Adding support for Json tables.
Adding support for Json tables.
+ Re-factored code to separate binary table specific code from the common code.
+ Added test cases
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f97a3332
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f97a3332
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f97a3332
Branch: refs/heads/master
Commit: f97a33321887458996cf7e311252fc9e8719feb3
Parents: e19c048
Author: Aditya <ad...@mapr.com>
Authored: Wed Oct 14 23:49:09 2015 -0700
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:31 2016 -0700
----------------------------------------------------------------------
contrib/format-maprdb/pom.xml | 221 ++++++--
.../store/maprdb/CompareFunctionsProcessor.java | 547 -------------------
.../exec/store/maprdb/MapRDBFilterBuilder.java | 356 ------------
.../exec/store/maprdb/MapRDBFormatPlugin.java | 51 +-
.../exec/store/maprdb/MapRDBGroupScan.java | 270 ++-------
.../store/maprdb/MapRDBPushFilterIntoScan.java | 144 -----
.../store/maprdb/MapRDBScanBatchCreator.java | 21 +-
.../drill/exec/store/maprdb/MapRDBSubScan.java | 177 ++----
.../exec/store/maprdb/MapRDBSubScanSpec.java | 113 ++++
.../exec/store/maprdb/MapRDBTableStats.java | 11 +-
.../exec/store/maprdb/TabletFragmentInfo.java | 108 ++++
.../maprdb/binary/BinaryTableGroupScan.java | 216 ++++++++
.../binary/CompareFunctionsProcessor.java | 547 +++++++++++++++++++
.../maprdb/binary/MapRDBFilterBuilder.java | 356 ++++++++++++
.../maprdb/binary/MapRDBPushFilterIntoScan.java | 141 +++++
.../store/maprdb/json/JsonTableGroupScan.java | 186 +++++++
.../maprdb/json/MaprDBJsonRecordReader.java | 386 +++++++++++++
.../drill/exec/store/maprdb/util/CommonFns.java | 26 +
.../drill/maprdb/tests/MaprDBTestsSuite.java | 162 ++++++
.../tests/binary/TestMapRDBFilterPushDown.java | 47 ++
.../maprdb/tests/binary/TestMapRDBSimple.java | 53 ++
.../drill/maprdb/tests/json/TestSimpleJson.java | 75 +++
.../src/test/resources/hbase-site.xml | 25 +
.../src/test/resources/json/business.json | 10 +
.../src/test/resources/logback.xml | 12 +-
25 files changed, 2771 insertions(+), 1490 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml
index 9e299b5..bcb6c29 100644
--- a/contrib/format-maprdb/pom.xml
+++ b/contrib/format-maprdb/pom.xml
@@ -15,102 +15,223 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
+
<parent>
- <artifactId>drill-contrib-parent</artifactId>
- <groupId>org.apache.drill.contrib</groupId>
- <version>1.5.0-SNAPSHOT</version>
+ <groupId>com.mapr</groupId>
+ <artifactId>mapr-release</artifactId>
+ <version>5.1.0-mapr-SNAPSHOT</version>
+ <relativePath/>
</parent>
<artifactId>drill-storage-maprdb</artifactId>
-
<name>maprdb-storage-plugin</name>
+ <version>1.5.0-SNAPSHOT</version>
<properties>
- <hbase.TestSuite>**/HBaseTestsSuite.class</hbase.TestSuite>
+ <mapr.version>${project.parent.version}</mapr.version>
+ <drill.version>${project.version}</drill.version>
+ <hbase.version>0.98.12-mapr-1506</hbase.version>
+ <maprdb.TestSuite>**/MaprDBTestsSuite.class</maprdb.TestSuite>
</properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>${maprdb.TestSuite}</include>
+ </includes>
+ <systemProperties>
+ <property>
+ <name>logback.log.dir</name>
+ <value>${project.build.directory}/surefire-reports</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
<dependencies>
<dependency>
+ <groupId>com.mapr.hadoop</groupId>
+ <artifactId>maprfs</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.mapr.fs</groupId>
+ <artifactId>mapr-hbase</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.mapr.db</groupId>
+ <artifactId>maprdb</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
- <version>${project.version}</version>
+ <version>${drill.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-storage-hbase</artifactId>
- <version>${project.version}</version>
+ <version>${drill.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hbase-client</artifactId>
+ <groupId>org.apache.hbase</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
+
+ <!-- Test dependencies -->
<dependency>
- <groupId>com.mapr.hadoop</groupId>
- <artifactId>maprfs</artifactId>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
</dependency>
<dependency>
- <groupId>com.mapr.fs</groupId>
- <artifactId>mapr-hbase</artifactId>
- <version>4.1.0-mapr</version>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>de.huxhorn.lilith</groupId>
+ <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
+ <version>0.9.44</version>
+ <scope>test</scope>
</dependency>
- <!-- Test dependencies -->
+ <dependency>
+ <groupId>com.mapr</groupId>
+ <artifactId>mapr-java-utils</artifactId>
+ <classifier>tests</classifier>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
+ <version>${drill.version}</version>
<classifier>tests</classifier>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
+ <version>${drill.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage-hbase</artifactId>
+ <version>${drill.version}</version>
<classifier>tests</classifier>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>2.1.1</version>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
<scope>test</scope>
+ <classifier>tests</classifier>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api-2.5</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
+
</dependencies>
<repositories>
<repository>
<id>mapr-releases</id>
- <url>http://repository.mapr.com/nexus/content/repositories/releases</url>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- </releases>
+ <url>http://repository.mapr.com/nexus/content/repositories/releases</url>
+ <snapshots><enabled>true</enabled></snapshots>
+ <releases><enabled>true</enabled></releases>
</repository>
</repositories>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <includes>
- <include>${hbase.TestSuite}</include>
- </includes>
- <systemProperties>
- <property>
- <name>hbase.test.root</name>
- <value>${project.build.directory}/data</value>
- </property>
- <property>
- <name>logback.log.dir</name>
- <value>${project.build.directory}/surefire-reports</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
deleted file mode 100644
index c6c2504..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.drill.common.expression.CastExpression;
-import org.apache.drill.common.expression.ConvertExpression;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DateExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.hadoop.hbase.util.Order;
-import org.apache.hadoop.hbase.util.PositionedByteRange;
-import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
-
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
- private byte[] value;
- private boolean success;
- private boolean isEqualityFn;
- private SchemaPath path;
- private String functionName;
- private boolean sortOrderAscending;
-
- // Fields for row-key prefix comparison
- // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter
- // Hence, we use these local variables(set depending upon the encoding type in user query)
- private boolean isRowKeyPrefixComparison;
- byte[] rowKeyPrefixStartRow;
- byte[] rowKeyPrefixStopRow;
- Filter rowKeyPrefixFilter;
-
- public static boolean isCompareFunction(String functionName) {
- return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
- }
-
- public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
- String functionName = call.getName();
- LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
- CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
-
- if (valueArg != null) { // binary function
- if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
- LogicalExpression swapArg = valueArg;
- valueArg = nameArg;
- nameArg = swapArg;
- evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
- }
- evaluator.success = nameArg.accept(evaluator, valueArg);
- } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
- evaluator.success = true;
- evaluator.path = (SchemaPath) nameArg;
- }
-
- return evaluator;
- }
-
- public CompareFunctionsProcessor(String functionName) {
- this.success = false;
- this.functionName = functionName;
- this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
- && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
- this.isRowKeyPrefixComparison = false;
- this.sortOrderAscending = true;
- }
-
- public byte[] getValue() {
- return value;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public SchemaPath getPath() {
- return path;
- }
-
- public String getFunctionName() {
- return functionName;
- }
-
- public boolean isRowKeyPrefixComparison() {
- return isRowKeyPrefixComparison;
- }
-
- public byte[] getRowKeyPrefixStartRow() {
- return rowKeyPrefixStartRow;
- }
-
- public byte[] getRowKeyPrefixStopRow() {
- return rowKeyPrefixStopRow;
- }
-
- public Filter getRowKeyPrefixFilter() {
- return rowKeyPrefixFilter;
- }
-
- public boolean isSortOrderAscending() {
- return sortOrderAscending;
- }
-
- @Override
- public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
- if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
- return e.getInput().accept(this, valueArg);
- }
- return false;
- }
-
- @Override
- public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
- if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
-
- String encodingType = e.getEncodingType();
- int prefixLength = 0;
-
- // Handle scan pruning in the following scenario:
- // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is
- // querying for the first few bytes of the row-key(start-offset 1)
- // Example WHERE clause:
- // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17'
- if (e.getInput() instanceof FunctionCall) {
-
- // We can prune scan range only for big-endian encoded data
- if (encodingType.endsWith("_BE") == false) {
- return false;
- }
-
- FunctionCall call = (FunctionCall)e.getInput();
- String functionName = call.getName();
- if (!functionName.equalsIgnoreCase("byte_substr")) {
- return false;
- }
-
- LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null;
- LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null;
-
- if (((nameArg instanceof SchemaPath) == false) ||
- (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) ||
- (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) {
- return false;
- }
-
- boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
- int offset = ((IntExpression)valueArg1).getInt();
-
- if (!isRowKey || (offset != 1)) {
- return false;
- }
-
- this.path = (SchemaPath)nameArg;
- prefixLength = ((IntExpression)valueArg2).getInt();
- this.isRowKeyPrefixComparison = true;
- return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
- }
-
- if (e.getInput() instanceof SchemaPath) {
- ByteBuf bb = null;
-
- switch (encodingType) {
- case "INT_BE":
- case "INT":
- case "UINT_BE":
- case "UINT":
- case "UINT4_BE":
- case "UINT4":
- if (valueArg instanceof IntExpression
- && (isEqualityFn || encodingType.startsWith("U"))) {
- bb = newByteBuf(4, encodingType.endsWith("_BE"));
- bb.writeInt(((IntExpression)valueArg).getInt());
- }
- break;
- case "BIGINT_BE":
- case "BIGINT":
- case "UINT8_BE":
- case "UINT8":
- if (valueArg instanceof LongExpression
- && (isEqualityFn || encodingType.startsWith("U"))) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((LongExpression)valueArg).getLong());
- }
- break;
- case "FLOAT":
- if (valueArg instanceof FloatExpression && isEqualityFn) {
- bb = newByteBuf(4, true);
- bb.writeFloat(((FloatExpression)valueArg).getFloat());
- }
- break;
- case "DOUBLE":
- if (valueArg instanceof DoubleExpression && isEqualityFn) {
- bb = newByteBuf(8, true);
- bb.writeDouble(((DoubleExpression)valueArg).getDouble());
- }
- break;
- case "TIME_EPOCH":
- case "TIME_EPOCH_BE":
- if (valueArg instanceof TimeExpression) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((TimeExpression)valueArg).getTime());
- }
- break;
- case "DATE_EPOCH":
- case "DATE_EPOCH_BE":
- if (valueArg instanceof DateExpression) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((DateExpression)valueArg).getDate());
- }
- break;
- case "BOOLEAN_BYTE":
- if (valueArg instanceof BooleanExpression) {
- bb = newByteBuf(1, false /* does not matter */);
- bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
- }
- break;
- case "DOUBLE_OB":
- case "DOUBLE_OBD":
- if (valueArg instanceof DoubleExpression) {
- bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
- ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
- ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
- }
- }
- break;
- case "FLOAT_OB":
- case "FLOAT_OBD":
- if (valueArg instanceof FloatExpression) {
- bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
- ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
- ((FloatExpression)valueArg).getFloat(), Order.ASCENDING);
- }
- }
- break;
- case "BIGINT_OB":
- case "BIGINT_OBD":
- if (valueArg instanceof LongExpression) {
- bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
- ((LongExpression)valueArg).getLong(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
- ((LongExpression)valueArg).getLong(), Order.ASCENDING);
- }
- }
- break;
- case "INT_OB":
- case "INT_OBD":
- if (valueArg instanceof IntExpression) {
- bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
- ((IntExpression)valueArg).getInt(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
- ((IntExpression)valueArg).getInt(), Order.ASCENDING);
- }
- }
- break;
- case "UTF8_OB":
- case "UTF8_OBD":
- if (valueArg instanceof QuotedString) {
- int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
- bb = newByteBuf(stringLen + 2, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
- ((QuotedString)valueArg).value, Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
- ((QuotedString)valueArg).value, Order.ASCENDING);
- }
- }
- break;
- case "UTF8":
- // let visitSchemaPath() handle this.
- return e.getInput().accept(this, valueArg);
- }
-
- if (bb != null) {
- this.value = bb.array();
- this.path = (SchemaPath)e.getInput();
- return true;
- }
- }
- }
- return false;
- }
-
- private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
- int prefixLength, LogicalExpression valueArg) {
- String encodingType = e.getEncodingType();
- rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
- rowKeyPrefixStopRow = HConstants.EMPTY_START_ROW;
- rowKeyPrefixFilter = null;
-
- if ((encodingType.compareTo("UINT4_BE") == 0) ||
- (encodingType.compareTo("UINT_BE") == 0)) {
- if (prefixLength != 4) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- int val;
- if ((valueArg instanceof IntExpression) == false) {
- return false;
- }
-
- val = ((IntExpression)valueArg).getInt();
-
- // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "greater_than_or_equal_to":
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
- return true;
- case "greater_than":
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "less_than_or_equal_to":
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "less_than":
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
- return true;
- }
-
- return false;
- }
-
- if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
- (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
- (encodingType.compareTo("UINT8_BE") == 0)) {
-
- if (prefixLength != 8) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- long val;
- if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
- if ((valueArg instanceof TimeExpression) == false) {
- return false;
- }
-
- val = ((TimeExpression)valueArg).getTime();
- } else if (encodingType.compareTo("UINT8_BE") == 0){
- if ((valueArg instanceof LongExpression) == false) {
- return false;
- }
-
- val = ((LongExpression)valueArg).getLong();
- } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
- if ((valueArg instanceof TimeStampExpression) == false) {
- return false;
- }
-
- val = ((TimeStampExpression)valueArg).getTimeStamp();
- } else {
- // Should not reach here.
- return false;
- }
-
- // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "greater_than_or_equal_to":
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
- return true;
- case "greater_than":
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "less_than_or_equal_to":
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "less_than":
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
- return true;
- }
-
- return false;
- }
-
- if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
- if ((valueArg instanceof DateExpression) == false) {
- return false;
- }
-
- if (prefixLength != 8) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
- long dateToSet;
- // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- long startDate = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array();
- long stopDate = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
- return true;
- case "greater_than_or_equal_to":
- dateToSet = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "greater_than":
- dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "less_than_or_equal_to":
- dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "less_than":
- dateToSet = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- }
-
- return false;
- }
-
- return false;
- }
-
- @Override
- public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
- return false;
- }
-
- @Override
- public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
- if (valueArg instanceof QuotedString) {
- this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8);
- this.path = path;
- return true;
- }
- return false;
- }
-
- private static ByteBuf newByteBuf(int size, boolean bigEndian) {
- return Unpooled.wrappedBuffer(new byte[size])
- .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
- .writerIndex(0);
- }
-
- private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
- static {
- ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
- VALUE_EXPRESSION_CLASSES = builder
- .add(BooleanExpression.class)
- .add(DateExpression.class)
- .add(DoubleExpression.class)
- .add(FloatExpression.class)
- .add(IntExpression.class)
- .add(LongExpression.class)
- .add(QuotedString.class)
- .add(TimeExpression.class)
- .build();
- }
-
- private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
- static {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
- // unary functions
- .put("isnotnull", "isnotnull")
- .put("isNotNull", "isNotNull")
- .put("is not null", "is not null")
- .put("isnull", "isnull")
- .put("isNull", "isNull")
- .put("is null", "is null")
- // binary functions
- .put("like", "like")
- .put("equal", "equal")
- .put("not_equal", "not_equal")
- .put("greater_than_or_equal_to", "less_than_or_equal_to")
- .put("greater_than", "less_than")
- .put("less_than_or_equal_to", "greater_than_or_equal_to")
- .put("less_than", "greater_than")
- .build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
deleted file mode 100644
index 857d799..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.Arrays;
-
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseRegexParser;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.hbase.HBaseUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.NullComparator;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-
-public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
-
- final private MapRDBGroupScan groupScan;
-
- final private LogicalExpression le;
-
- private boolean allExpressionsConverted = true;
-
- private static Boolean nullComparatorSupported = null;
-
- MapRDBFilterBuilder(MapRDBGroupScan groupScan, LogicalExpression le) {
- this.groupScan = groupScan;
- this.le = le;
- }
-
- public HBaseScanSpec parseTree() {
- HBaseScanSpec parsedSpec = le.accept(this, null);
- if (parsedSpec != null) {
- parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
- /*
- * If RowFilter is THE filter attached to the scan specification,
- * remove it since its effect is also achieved through startRow and stopRow.
- */
- Filter filter = parsedSpec.getFilter();
- if (filter instanceof RowFilter &&
- ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL &&
- ((RowFilter)filter).getComparator() instanceof BinaryComparator) {
- filter = null;
- }
- }
- return parsedSpec;
- }
-
- public boolean isAllExpressionsConverted() {
- return allExpressionsConverted;
- }
-
- @Override
- public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
- allExpressionsConverted = false;
- return null;
- }
-
- @Override
- public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
- return visitFunctionCall(op, value);
- }
-
- @Override
- public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
- HBaseScanSpec nodeScanSpec = null;
- String functionName = call.getName();
- ImmutableList<LogicalExpression> args = call.args;
-
- if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
- /*
- * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
- * causes a filter with NullComparator to fail. Enable only if specified in
- * the configuration (after ensuring that the HBase cluster has the fix).
- */
- if (nullComparatorSupported == null) {
- nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
- }
-
- CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
- if (processor.isSuccess()) {
- nodeScanSpec = createHBaseScanSpec(call, processor);
- }
- } else {
- switch (functionName) {
- case "booleanAnd":
- case "booleanOr":
- HBaseScanSpec firstScanSpec = args.get(0).accept(this, null);
- for (int i = 1; i < args.size(); ++i) {
- HBaseScanSpec nextScanSpec = args.get(i).accept(this, null);
- if (firstScanSpec != null && nextScanSpec != null) {
- nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
- } else {
- allExpressionsConverted = false;
- if ("booleanAnd".equals(functionName)) {
- nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
- }
- }
- firstScanSpec = nodeScanSpec;
- }
- break;
- }
- }
-
- if (nodeScanSpec == null) {
- allExpressionsConverted = false;
- }
-
- return nodeScanSpec;
- }
-
- private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
- Filter newFilter = null;
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
-
- switch (functionName) {
- case "booleanAnd":
- newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
- startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
- stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
- break;
- case "booleanOr":
- newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
- startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
- stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
- }
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
- }
-
- private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) {
- String functionName = processor.getFunctionName();
- SchemaPath field = processor.getPath();
- byte[] fieldValue = processor.getValue();
- boolean sortOrderAscending = processor.isSortOrderAscending();
- boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
- if (!(isRowKey
- || (!field.getRootSegment().isLastPath()
- && field.getRootSegment().getChild().isLastPath()
- && field.getRootSegment().getChild().isNamed())
- )
- ) {
- /*
- * if the field in this function is neither the row_key nor a qualified HBase column, return.
- */
- return null;
- }
-
- if (processor.isRowKeyPrefixComparison()) {
- return createRowKeyPrefixScanSpec(call, processor);
- }
-
- CompareOp compareOp = null;
- boolean isNullTest = false;
- ByteArrayComparable comparator = new BinaryComparator(fieldValue);
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
- switch (functionName) {
- case "equal":
- compareOp = CompareOp.EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- /* stopRow should be just greater than 'value'*/
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- compareOp = CompareOp.EQUAL;
- }
- break;
- case "not_equal":
- compareOp = CompareOp.NOT_EQUAL;
- break;
- case "greater_than_or_equal_to":
- if (sortOrderAscending) {
- compareOp = CompareOp.GREATER_OR_EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- }
- } else {
- compareOp = CompareOp.LESS_OR_EQUAL;
- if (isRowKey) {
- // stopRow should be just greater than 'value'
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- }
- break;
- case "greater_than":
- if (sortOrderAscending) {
- compareOp = CompareOp.GREATER;
- if (isRowKey) {
- // startRow should be just greater than 'value'
- startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- } else {
- compareOp = CompareOp.LESS;
- if (isRowKey) {
- stopRow = fieldValue;
- }
- }
- break;
- case "less_than_or_equal_to":
- if (sortOrderAscending) {
- compareOp = CompareOp.LESS_OR_EQUAL;
- if (isRowKey) {
- // stopRow should be just greater than 'value'
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- } else {
- compareOp = CompareOp.GREATER_OR_EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- }
- }
- break;
- case "less_than":
- if (sortOrderAscending) {
- compareOp = CompareOp.LESS;
- if (isRowKey) {
- stopRow = fieldValue;
- }
- } else {
- compareOp = CompareOp.GREATER;
- if (isRowKey) {
- // startRow should be just greater than 'value'
- startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- }
- break;
- case "isnull":
- case "isNull":
- case "is null":
- if (isRowKey) {
- return null;
- }
- isNullTest = true;
- compareOp = CompareOp.EQUAL;
- comparator = new NullComparator();
- break;
- case "isnotnull":
- case "isNotNull":
- case "is not null":
- if (isRowKey) {
- return null;
- }
- compareOp = CompareOp.NOT_EQUAL;
- comparator = new NullComparator();
- break;
- case "like":
- /*
- * Convert the LIKE operand to Regular Expression pattern so that we can
- * apply RegexStringComparator()
- */
- HBaseRegexParser parser = new HBaseRegexParser(call).parse();
- compareOp = CompareOp.EQUAL;
- comparator = new RegexStringComparator(parser.getRegexString());
-
- /*
- * We can possibly do better if the LIKE operator is on the row_key
- */
- if (isRowKey) {
- String prefix = parser.getPrefixString();
- if (prefix != null) { // group 3 is literal
- /*
- * If there is a literal prefix, it can help us prune the scan to a sub range
- */
- if (prefix.equals(parser.getLikeString())) {
- /* The operand value is literal. This turns the LIKE operator to EQUAL operator */
- startRow = stopRow = fieldValue;
- compareOp = null;
- } else {
- startRow = prefix.getBytes(Charsets.UTF_8);
- stopRow = startRow.clone();
- boolean isMaxVal = true;
- for (int i = stopRow.length - 1; i >= 0 ; --i) {
- int nextByteValue = (0xff & stopRow[i]) + 1;
- if (nextByteValue < 0xff) {
- stopRow[i] = (byte) nextByteValue;
- isMaxVal = false;
- break;
- } else {
- stopRow[i] = 0;
- }
- }
- if (isMaxVal) {
- stopRow = HConstants.EMPTY_END_ROW;
- }
- }
- }
- }
- break;
- }
-
- if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
- Filter filter = null;
- if (isRowKey) {
- if (compareOp != null) {
- filter = new RowFilter(compareOp, comparator);
- }
- } else {
- byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
- byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
- filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
- ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
- if (!isNullTest) {
- ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
- }
- }
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
- }
- // else
- return null;
- }
-
- private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
- CompareFunctionsProcessor processor) {
- byte[] startRow = processor.getRowKeyPrefixStartRow();
- byte[] stopRow = processor.getRowKeyPrefixStopRow();
- Filter filter = processor.getRowKeyPrefixFilter();
-
- if (startRow != HConstants.EMPTY_START_ROW ||
- stopRow != HConstants.EMPTY_END_ROW ||
- filter != null) {
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
- }
-
- // else
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
index aa91901..d22434d 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
@@ -17,7 +17,11 @@
*/
package org.apache.drill.exec.store.maprdb;
+import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX;
+
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
@@ -31,26 +35,36 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.maprdb.binary.MapRDBPushFilterIntoScan;
+import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableSet;
+import com.mapr.fs.MapRFileSystem;
+import com.mapr.fs.tables.TableProperties;
public class MapRDBFormatPlugin implements FormatPlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(MapRDBFormatPlugin.class);
- private final StoragePluginConfig storageConfig;
+ private final FileSystemConfig storageConfig;
private final MapRDBFormatPluginConfig config;
private final MapRDBFormatMatcher matcher;
private final Configuration fsConf;
private final DrillbitContext context;
private final String name;
+ private volatile FileSystemPlugin storagePlugin;
+ private volatile MapRFileSystem maprfs;
+
public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig) {
this(name, context, fsConf, storageConfig, new MapRDBFormatPluginConfig());
@@ -61,9 +75,15 @@ public class MapRDBFormatPlugin implements FormatPlugin {
this.context = context;
this.config = formatConfig;
this.matcher = new MapRDBFormatMatcher(this);
- this.storageConfig = storageConfig;
+ this.storageConfig = (FileSystemConfig) storageConfig;
this.fsConf = fsConf;
this.name = name == null ? "maprdb" : name;
+ try {
+ this.maprfs = new MapRFileSystem();
+ maprfs.initialize(new URI(MAPRFS_PREFIX), fsConf);
+ } catch (IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -108,14 +128,14 @@ public class MapRDBFormatPlugin implements FormatPlugin {
List<String> files = selection.getFiles();
assert (files.size() == 1);
String tableName = files.get(0);
- HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
- try {
- return new MapRDBGroupScan(userName,
- (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig)), this, scanSpec,
- columns);
- } catch (ExecutionSetupException e) {
- e.printStackTrace();
- return null;
+ TableProperties props = maprfs.getTableProperties(new Path(tableName));
+
+ if (props.getAttr().getJson()) {
+ MapRDBSubScanSpec scanSpec = new MapRDBSubScanSpec().setTableName(tableName);
+ return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
+ } else {
+ HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
+ return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
}
}
@@ -139,4 +159,15 @@ public class MapRDBFormatPlugin implements FormatPlugin {
return name;
}
+ public synchronized FileSystemPlugin getStoragePlugin() {
+ if (this.storagePlugin == null) {
+ try {
+ this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
+ } catch (ExecutionSetupException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return storagePlugin;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
index 9358753..cbfb18c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.maprdb;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -30,163 +29,64 @@ import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.codehaus.jackson.annotate.JsonCreator;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
+
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-@JsonTypeName("maprdb-scan")
-public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
-
- private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
- @Override
- public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
- return list1.size() - list2.size();
- }
- };
-
- private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
-
- private List<SchemaPath> columns;
-
- private HBaseScanSpec hbaseScanSpec;
+public abstract class MapRDBGroupScan extends AbstractGroupScan {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
private FileSystemPlugin storagePlugin;
private MapRDBFormatPlugin formatPlugin;
- private Stopwatch watch = Stopwatch.createUnstarted();
+ protected List<SchemaPath> columns;
- private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
+ protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
- private NavigableMap<HRegionInfo, ServerName> regionsToScan;
-
- private HTableDescriptor hTableDesc;
+ protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
private boolean filterPushedDown = false;
- private MapRDBTableStats tableStats;
-
- @JsonCreator
- public MapRDBGroupScan(@JsonProperty("userName") final String userName,
- @JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
- @JsonProperty("storage") FileSystemConfig storagePluginConfig,
- @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName, (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
- hbaseScanSpec,
- columns);
- }
+ private Stopwatch watch = new Stopwatch();
- public MapRDBGroupScan(String userName, FileSystemPlugin storagePlugin, MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
- super(userName);
- this.storagePlugin = storagePlugin;
- this.formatPlugin = formatPlugin;
- this.hbaseScanSpec = scanSpec;
- this.columns = columns;
- init();
- }
+ private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
+ @Override
+ public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) {
+ return list1.size() - list2.size();
+ }
+ };
- /**
- * Private constructor, used for cloning.
- * @param that The HBaseGroupScan to clone
- */
- private MapRDBGroupScan(MapRDBGroupScan that) {
+ private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
+ public MapRDBGroupScan(MapRDBGroupScan that) {
super(that);
this.columns = that.columns;
- this.hbaseScanSpec = that.hbaseScanSpec;
- this.endpointFragmentMapping = that.endpointFragmentMapping;
- this.regionsToScan = that.regionsToScan;
- this.storagePlugin = that.storagePlugin;
this.formatPlugin = that.formatPlugin;
- this.hTableDesc = that.hTableDesc;
+ this.storagePlugin = that.storagePlugin;
+ this.regionsToScan = that.regionsToScan;
this.filterPushedDown = that.filterPushedDown;
- this.tableStats = that.tableStats;
- }
-
- @Override
- public GroupScan clone(List<SchemaPath> columns) {
- MapRDBGroupScan newScan = new MapRDBGroupScan(this);
- newScan.columns = columns;
- newScan.verifyColumns();
- return newScan;
- }
-
- private void init() {
- logger.debug("Getting region locations");
- try {
- HTable table = new HTable(HBaseConfiguration.create(), hbaseScanSpec.getTableName());
- tableStats = new MapRDBTableStats(table);
- this.hTableDesc = table.getTableDescriptor();
- NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
- table.close();
-
- boolean foundStartRegion = false;
- regionsToScan = new TreeMap<HRegionInfo, ServerName>();
- for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
- HRegionInfo regionInfo = mapEntry.getKey();
- if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
- continue;
- }
- foundStartRegion = true;
- regionsToScan.put(regionInfo, mapEntry.getValue());
- if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
- break;
- }
- }
- } catch (Exception e) {
- throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
- }
- verifyColumns();
}
- private void verifyColumns() {
- /*
- if (columns != null) {
- for (SchemaPath column : columns) {
- if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
- DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
- column.getRootSegment().getPath(), hTableDesc.getNameAsString());
- }
- }
- }
- */
+ public MapRDBGroupScan(FileSystemPlugin storagePlugin,
+ MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
+ super(userName);
+ this.storagePlugin = storagePlugin;
+ this.formatPlugin = formatPlugin;
+ this.columns = columns;
}
@Override
@@ -199,8 +99,8 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
}
Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
- for (ServerName sn : regionsToScan.values()) {
- DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
+ for (String serverName : regionsToScan.values()) {
+ DrillbitEndpoint ep = endpointMap.get(serverName);
if (ep != null) {
EndpointAffinity affinity = affinityMap.get(ep);
if (affinity == null) {
@@ -247,7 +147,7 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
* Initialize these two maps
*/
for (int i = 0; i < numSlots; ++i) {
- endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+ endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot));
String hostname = incomingEndpoints.get(i).getAddress();
Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
if (hostIndexQueue == null) {
@@ -257,21 +157,21 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
hostIndexQueue.add(i);
}
- Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+ Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
/*
* First, we assign regions which are hosted on region servers running on drillbit endpoints
*/
- for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
- Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next();
+ for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+ Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next();
/*
* Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
*/
- Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+ Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue());
if (endpointIndexlist != null) {
Integer slotIndex = endpointIndexlist.poll();
- List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
- endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+ List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+ endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
// add to the tail of the slot list, to add more later in round robin fashion
endpointIndexlist.offer(slotIndex);
// this region has been assigned
@@ -282,9 +182,9 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
/*
* Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
*/
- PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
- for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+ for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
if (listOfScan.size() < minPerEndpointSlot) {
minHeap.offer(listOfScan);
} else if (listOfScan.size() > minPerEndpointSlot){
@@ -296,9 +196,9 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
* Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
*/
if (regionsToAssignSet.size() > 0) {
- for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) {
- List<HBaseSubScanSpec> smallestList = minHeap.poll();
- smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+ for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
+ List<MapRDBSubScanSpec> smallestList = minHeap.poll();
+ smallestList.add(getSubScanSpec(regionEntry.getKey()));
if (smallestList.size() < maxPerEndpointSlot) {
minHeap.offer(smallestList);
}
@@ -309,8 +209,8 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
* While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
*/
while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
- List<HBaseSubScanSpec> smallestList = minHeap.poll();
- List<HBaseSubScanSpec> largestList = maxHeap.poll();
+ List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll();
+ List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll();
smallestList.add(largestList.remove(largestList.size()-1));
if (largestList.size() > minPerEndpointSlot) {
maxHeap.offer(largestList);
@@ -329,77 +229,21 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
}
- private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
- HBaseScanSpec spec = hbaseScanSpec;
- HBaseSubScanSpec subScanSpec = new HBaseSubScanSpec(spec.getTableName(),
- regionsToScan.get(ri).getHostname(),
- (!isNullOrEmpty(spec.getStartRow()) && ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey(),
- (!isNullOrEmpty(spec.getStopRow()) && ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey(),
- spec.getSerializedFilter(),
- null);
- return subScanSpec;
- }
-
- private boolean isNullOrEmpty(byte[] key) {
- return key == null || key.length == 0;
- }
-
- @Override
- public MapRDBSubScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < endpointFragmentMapping.size() : String.format(
- "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
- minorFragmentId);
- return new MapRDBSubScan(getUserName(), storagePlugin, storagePlugin.getConfig(), endpointFragmentMapping.get(minorFragmentId), columns);
- }
-
@Override
public int getMaxParallelizationWidth() {
return regionsToScan.size();
}
- @Override
- public ScanStats getScanStats() {
- //TODO: look at stats for this.
- long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows());
- int avgColumnSize = 10;
- int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new MapRDBGroupScan(this);
- }
-
@JsonIgnore
public MapRDBFormatPlugin getFormatPlugin() {
return formatPlugin;
}
- @JsonIgnore
- public Configuration getHBaseConf() {
- return HBaseConfiguration.create();
- }
-
- @JsonIgnore
- public String getTableName() {
- return getHBaseScanSpec().getTableName();
- }
-
@Override
public String getDigest() {
return toString();
}
- @Override
- public String toString() {
- return "MapRDBGroupScan [HBaseScanSpec="
- + hbaseScanSpec + ", columns="
- + columns + "]";
- }
-
@JsonProperty("storage")
public FileSystemConfig getStorageConfig() {
return (FileSystemConfig) storagePlugin.getConfig();
@@ -407,7 +251,7 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
@JsonIgnore
public FileSystemPlugin getStoragePlugin(){
- return storagePlugin;
+ return storagePlugin;
}
@JsonProperty
@@ -415,11 +259,6 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
return columns;
}
- @JsonProperty
- public HBaseScanSpec getHBaseScanSpec() {
- return hbaseScanSpec;
- }
-
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
@@ -435,29 +274,6 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
return filterPushedDown;
}
- /**
- * Empty constructor, do not use, only for testing.
- */
- @VisibleForTesting
- public MapRDBGroupScan() {
- super((String)null);
- }
-
- /**
- * Do not use, only for testing.
- */
- @VisibleForTesting
- public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) {
- this.hbaseScanSpec = hbaseScanSpec;
- }
-
- /**
- * Do not use, only for testing.
- */
- @JsonIgnore
- @VisibleForTesting
- public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> regionsToScan) {
- this.regionsToScan = regionsToScan;
- }
+ protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
deleted file mode 100644
index 50f3d95..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.FilterPrel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.collect.ImmutableList;
-
-public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
-
-// public static final StoragePluginOptimizerRule INSTANCE = new MapRDBPushFilterIntoScan();
-
- private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
- super(operand, description);
- }
-
- public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(1);
- final FilterPrel filter = (FilterPrel) call.rel(0);
- final RexNode condition = filter.getCondition();
-
- MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
- if (groupScan.isFilterPushedDown()) {
- /*
- * The rule can get triggered again due to the transformed "scan => filter" sequence
- * created by the earlier execution of this rule when we could not do a complete
- * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
- * this flag to not do a re-processing of the rule on the already transformed call.
- */
- return;
- }
-
- doPushFilterToScan(call, filter, null, scan, groupScan, condition);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(1);
- if (scan.getGroupScan() instanceof MapRDBGroupScan) {
- return super.matches(call);
- }
- return false;
- }
- };
-
- public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(2);
- final ProjectPrel project = (ProjectPrel) call.rel(1);
- final FilterPrel filter = (FilterPrel) call.rel(0);
-
- MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
- if (groupScan.isFilterPushedDown()) {
- /*
- * The rule can get triggered again due to the transformed "scan => filter" sequence
- * created by the earlier execution of this rule when we could not do a complete
- * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
- * this flag to not do a re-processing of the rule on the already transformed call.
- */
- return;
- }
-
- // convert the filter to one that references the child of the project
- final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
-
- doPushFilterToScan(call, filter, project, scan, groupScan, condition);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(2);
- if (scan.getGroupScan() instanceof MapRDBGroupScan) {
- return super.matches(call);
- }
- return false;
- }
- };
-
- protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final MapRDBGroupScan groupScan, final RexNode condition) {
-
- final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
- final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
- final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
- if (newScanSpec == null) {
- return; //no filter pushdown ==> No transformation.
- }
-
- final MapRDBGroupScan newGroupsScan = new MapRDBGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
- newGroupsScan.setFilterPushedDown(true);
-
- final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
- // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
- final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
- if (maprdbFilterBuilder.isAllExpressionsConverted()) {
- /*
- * Since we could convert the entire filter condition expression into an HBase filter,
- * we can eliminate the filter operator altogether.
- */
- call.transformTo(childRel);
- } else {
- call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
index 1beabc9..058de61 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
@@ -26,7 +26,10 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.hbase.HBaseRecordReader;
-import org.apache.drill.exec.store.hbase.HBaseSubScan;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
+import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.maprdb.json.MaprDBJsonRecordReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import com.google.common.base.Preconditions;
@@ -39,11 +42,14 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
- for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
+ Configuration conf = HBaseConfiguration.create();
+ for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
- readers.add(
- new HBaseRecordReader(HBaseConfiguration.create(), scanSpec, subScan.getColumns(), context)
- );
+ if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
+ readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
+ } else {
+ readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getColumns(), context));
+ }
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
}
@@ -51,4 +57,9 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
return new ScanBatch(subScan, context, readers.iterator());
}
+ private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {
+ return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(),
+ scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null);
+ }
+
}