You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:25 UTC
[38/50] [abbrv] drill git commit: Refactoring code for better
organization.
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
new file mode 100644
index 0000000..0c901d7
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.binary;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hbase.util.Order;
+import org.apache.hadoop.hbase.util.PositionedByteRange;
+import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
+
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+ private byte[] value;
+ private boolean success;
+ private boolean isEqualityFn;
+ private SchemaPath path;
+ private String functionName;
+ private boolean sortOrderAscending;
+
+ // Fields for row-key prefix comparison
+ // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter
+ // Hence, we use these local variables(set depending upon the encoding type in user query)
+ private boolean isRowKeyPrefixComparison;
+ byte[] rowKeyPrefixStartRow;
+ byte[] rowKeyPrefixStopRow;
+ Filter rowKeyPrefixFilter;
+
+ public static boolean isCompareFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
+ CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+
+ if (valueArg != null) { // binary function
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
+ evaluator.success = true;
+ evaluator.path = (SchemaPath) nameArg;
+ }
+
+ return evaluator;
+ }
+
+ public CompareFunctionsProcessor(String functionName) {
+ this.success = false;
+ this.functionName = functionName;
+ this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
+ && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+ this.isRowKeyPrefixComparison = false;
+ this.sortOrderAscending = true;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public boolean isRowKeyPrefixComparison() {
+ return isRowKeyPrefixComparison;
+ }
+
+ public byte[] getRowKeyPrefixStartRow() {
+ return rowKeyPrefixStartRow;
+ }
+
+ public byte[] getRowKeyPrefixStopRow() {
+ return rowKeyPrefixStopRow;
+ }
+
+ public Filter getRowKeyPrefixFilter() {
+ return rowKeyPrefixFilter;
+ }
+
+ public boolean isSortOrderAscending() {
+ return sortOrderAscending;
+ }
+
+ @Override
+ public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
+ return e.getInput().accept(this, valueArg);
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
+
+ String encodingType = e.getEncodingType();
+ int prefixLength = 0;
+
+ // Handle scan pruning in the following scenario:
+ // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is
+ // querying for the first few bytes of the row-key(start-offset 1)
+ // Example WHERE clause:
+ // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17'
+ if (e.getInput() instanceof FunctionCall) {
+
+ // We can prune scan range only for big-endian encoded data
+ if (encodingType.endsWith("_BE") == false) {
+ return false;
+ }
+
+ FunctionCall call = (FunctionCall)e.getInput();
+ String functionName = call.getName();
+ if (!functionName.equalsIgnoreCase("byte_substr")) {
+ return false;
+ }
+
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null;
+ LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null;
+
+ if (((nameArg instanceof SchemaPath) == false) ||
+ (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) ||
+ (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) {
+ return false;
+ }
+
+ boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
+ int offset = ((IntExpression)valueArg1).getInt();
+
+ if (!isRowKey || (offset != 1)) {
+ return false;
+ }
+
+ this.path = (SchemaPath)nameArg;
+ prefixLength = ((IntExpression)valueArg2).getInt();
+ this.isRowKeyPrefixComparison = true;
+ return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
+ }
+
+ if (e.getInput() instanceof SchemaPath) {
+ ByteBuf bb = null;
+
+ switch (encodingType) {
+ case "INT_BE":
+ case "INT":
+ case "UINT_BE":
+ case "UINT":
+ case "UINT4_BE":
+ case "UINT4":
+ if (valueArg instanceof IntExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = newByteBuf(4, encodingType.endsWith("_BE"));
+ bb.writeInt(((IntExpression)valueArg).getInt());
+ }
+ break;
+ case "BIGINT_BE":
+ case "BIGINT":
+ case "UINT8_BE":
+ case "UINT8":
+ if (valueArg instanceof LongExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
+ bb.writeLong(((LongExpression)valueArg).getLong());
+ }
+ break;
+ case "FLOAT":
+ if (valueArg instanceof FloatExpression && isEqualityFn) {
+ bb = newByteBuf(4, true);
+ bb.writeFloat(((FloatExpression)valueArg).getFloat());
+ }
+ break;
+ case "DOUBLE":
+ if (valueArg instanceof DoubleExpression && isEqualityFn) {
+ bb = newByteBuf(8, true);
+ bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+ }
+ break;
+ case "TIME_EPOCH":
+ case "TIME_EPOCH_BE":
+ if (valueArg instanceof TimeExpression) {
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
+ bb.writeLong(((TimeExpression)valueArg).getTime());
+ }
+ break;
+ case "DATE_EPOCH":
+ case "DATE_EPOCH_BE":
+ if (valueArg instanceof DateExpression) {
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
+ bb.writeLong(((DateExpression)valueArg).getDate());
+ }
+ break;
+ case "BOOLEAN_BYTE":
+ if (valueArg instanceof BooleanExpression) {
+ bb = newByteBuf(1, false /* does not matter */);
+ bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+ }
+ break;
+ case "DOUBLE_OB":
+ case "DOUBLE_OBD":
+ if (valueArg instanceof DoubleExpression) {
+ bb = newByteBuf(9, true);
+ PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+ if (encodingType.endsWith("_OBD")) {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+ ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
+ this.sortOrderAscending = false;
+ } else {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+ ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
+ }
+ }
+ break;
+ case "FLOAT_OB":
+ case "FLOAT_OBD":
+ if (valueArg instanceof FloatExpression) {
+ bb = newByteBuf(5, true);
+ PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+ if (encodingType.endsWith("_OBD")) {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+ ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
+ this.sortOrderAscending = false;
+ } else {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+ ((FloatExpression)valueArg).getFloat(), Order.ASCENDING);
+ }
+ }
+ break;
+ case "BIGINT_OB":
+ case "BIGINT_OBD":
+ if (valueArg instanceof LongExpression) {
+ bb = newByteBuf(9, true);
+ PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+ if (encodingType.endsWith("_OBD")) {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+ ((LongExpression)valueArg).getLong(), Order.DESCENDING);
+ this.sortOrderAscending = false;
+ } else {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+ ((LongExpression)valueArg).getLong(), Order.ASCENDING);
+ }
+ }
+ break;
+ case "INT_OB":
+ case "INT_OBD":
+ if (valueArg instanceof IntExpression) {
+ bb = newByteBuf(5, true);
+ PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+ if (encodingType.endsWith("_OBD")) {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+ ((IntExpression)valueArg).getInt(), Order.DESCENDING);
+ this.sortOrderAscending = false;
+ } else {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+ ((IntExpression)valueArg).getInt(), Order.ASCENDING);
+ }
+ }
+ break;
+ case "UTF8_OB":
+ case "UTF8_OBD":
+ if (valueArg instanceof QuotedString) {
+ int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
+ bb = newByteBuf(stringLen + 2, true);
+ PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
+ if (encodingType.endsWith("_OBD")) {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+ ((QuotedString)valueArg).value, Order.DESCENDING);
+ this.sortOrderAscending = false;
+ } else {
+ org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+ ((QuotedString)valueArg).value, Order.ASCENDING);
+ }
+ }
+ break;
+ case "UTF8":
+ // let visitSchemaPath() handle this.
+ return e.getInput().accept(this, valueArg);
+ }
+
+ if (bb != null) {
+ this.value = bb.array();
+ this.path = (SchemaPath)e.getInput();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
+ int prefixLength, LogicalExpression valueArg) {
+ String encodingType = e.getEncodingType();
+ rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
+ rowKeyPrefixStopRow = HConstants.EMPTY_START_ROW;
+ rowKeyPrefixFilter = null;
+
+ if ((encodingType.compareTo("UINT4_BE") == 0) ||
+ (encodingType.compareTo("UINT_BE") == 0)) {
+ if (prefixLength != 4) {
+ throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+ }
+
+ int val;
+ if ((valueArg instanceof IntExpression) == false) {
+ return false;
+ }
+
+ val = ((IntExpression)valueArg).getInt();
+
+ // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+ switch (functionName) {
+ case "equal":
+ rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
+ rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+ rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+ return true;
+ case "greater_than_or_equal_to":
+ rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+ return true;
+ case "greater_than":
+ rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+ return true;
+ case "less_than_or_equal_to":
+ rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+ return true;
+ case "less_than":
+ rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
+ return true;
+ }
+
+ return false;
+ }
+
+ if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
+ (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
+ (encodingType.compareTo("UINT8_BE") == 0)) {
+
+ if (prefixLength != 8) {
+ throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+ }
+
+ long val;
+ if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
+ if ((valueArg instanceof TimeExpression) == false) {
+ return false;
+ }
+
+ val = ((TimeExpression)valueArg).getTime();
+ } else if (encodingType.compareTo("UINT8_BE") == 0){
+ if ((valueArg instanceof LongExpression) == false) {
+ return false;
+ }
+
+ val = ((LongExpression)valueArg).getLong();
+ } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
+ if ((valueArg instanceof TimeStampExpression) == false) {
+ return false;
+ }
+
+ val = ((TimeStampExpression)valueArg).getTimeStamp();
+ } else {
+ // Should not reach here.
+ return false;
+ }
+
+ // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+ switch (functionName) {
+ case "equal":
+ rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+ return true;
+ case "greater_than_or_equal_to":
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
+ return true;
+ case "greater_than":
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+ return true;
+ case "less_than_or_equal_to":
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+ return true;
+ case "less_than":
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
+ return true;
+ }
+
+ return false;
+ }
+
+ if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
+ if ((valueArg instanceof DateExpression) == false) {
+ return false;
+ }
+
+ if (prefixLength != 8) {
+ throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+ }
+
+ final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
+ long dateToSet;
+ // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+ switch (functionName) {
+ case "equal":
+ long startDate = ((DateExpression)valueArg).getDate();
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array();
+ long stopDate = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
+ return true;
+ case "greater_than_or_equal_to":
+ dateToSet = ((DateExpression)valueArg).getDate();
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+ return true;
+ case "greater_than":
+ dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+ rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+ return true;
+ case "less_than_or_equal_to":
+ dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+ return true;
+ case "less_than":
+ dateToSet = ((DateExpression)valueArg).getDate();
+ rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+ return true;
+ }
+
+ return false;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+ if (valueArg instanceof QuotedString) {
+ this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8);
+ this.path = path;
+ return true;
+ }
+ return false;
+ }
+
+ private static ByteBuf newByteBuf(int size, boolean bigEndian) {
+ return Unpooled.wrappedBuffer(new byte[size])
+ .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
+ .writerIndex(0);
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(BooleanExpression.class)
+ .add(DateExpression.class)
+ .add(DoubleExpression.class)
+ .add(FloatExpression.class)
+ .add(IntExpression.class)
+ .add(LongExpression.class)
+ .add(QuotedString.class)
+ .add(TimeExpression.class)
+ .build();
+ }
+
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // unary functions
+ .put("isnotnull", "isnotnull")
+ .put("isNotNull", "isNotNull")
+ .put("is not null", "is not null")
+ .put("isnull", "isnull")
+ .put("isNull", "isNull")
+ .put("is null", "is null")
+ // binary functions
+ .put("like", "like")
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
new file mode 100644
index 0000000..3aba1e7
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.binary;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.hbase.HBaseRegexParser;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.hbase.HBaseUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+
+public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
+
+ final private BinaryTableGroupScan groupScan;
+
+ final private LogicalExpression le;
+
+ private boolean allExpressionsConverted = true;
+
+ private static Boolean nullComparatorSupported = null;
+
+ public MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) {
+ this.groupScan = groupScan;
+ this.le = le;
+ }
+
+ public HBaseScanSpec parseTree() {
+ HBaseScanSpec parsedSpec = le.accept(this, null);
+ if (parsedSpec != null) {
+ parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
+ /*
+ * If RowFilter is THE filter attached to the scan specification,
+ * remove it since its effect is also achieved through startRow and stopRow.
+ */
+ Filter filter = parsedSpec.getFilter();
+ if (filter instanceof RowFilter &&
+ ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL &&
+ ((RowFilter)filter).getComparator() instanceof BinaryComparator) {
+ parsedSpec = new HBaseScanSpec(parsedSpec.getTableName(), parsedSpec.getStartRow(), parsedSpec.getStopRow(), null);
+ }
+ }
+ return parsedSpec;
+ }
+
+ public boolean isAllExpressionsConverted() {
+ return allExpressionsConverted;
+ }
+
+ @Override
+ public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
+ }
+
+ @Override
+ public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
+ return visitFunctionCall(op, value);
+ }
+
+ @Override
+ public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+ HBaseScanSpec nodeScanSpec = null;
+ String functionName = call.getName();
+ ImmutableList<LogicalExpression> args = call.args;
+
+ if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
+ /*
+ * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+ * causes a filter with NullComparator to fail. Enable only if specified in
+ * the configuration (after ensuring that the HBase cluster has the fix).
+ */
+ if (nullComparatorSupported == null) {
+ nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
+ }
+
+ CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
+ if (processor.isSuccess()) {
+ nodeScanSpec = createHBaseScanSpec(call, processor);
+ }
+ } else {
+ switch (functionName) {
+ case "booleanAnd":
+ case "booleanOr":
+ HBaseScanSpec firstScanSpec = args.get(0).accept(this, null);
+ for (int i = 1; i < args.size(); ++i) {
+ HBaseScanSpec nextScanSpec = args.get(i).accept(this, null);
+ if (firstScanSpec != null && nextScanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
+ nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
+ }
+ }
+ firstScanSpec = nodeScanSpec;
+ }
+ break;
+ }
+ }
+
+ if (nodeScanSpec == null) {
+ allExpressionsConverted = false;
+ }
+
+ return nodeScanSpec;
+ }
+
+ private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
+ Filter newFilter = null;
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+ switch (functionName) {
+ case "booleanAnd":
+ newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
+ startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
+ stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
+ break;
+ case "booleanOr":
+ newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
+ startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
+ stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
+ }
+ return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
+ }
+
+ private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) {
+ String functionName = processor.getFunctionName();
+ SchemaPath field = processor.getPath();
+ byte[] fieldValue = processor.getValue();
+ boolean sortOrderAscending = processor.isSortOrderAscending();
+ boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
+ if (!(isRowKey
+ || (!field.getRootSegment().isLastPath()
+ && field.getRootSegment().getChild().isLastPath()
+ && field.getRootSegment().getChild().isNamed())
+ )
+ ) {
+ /*
+ * if the field in this function is neither the row_key nor a qualified HBase column, return.
+ */
+ return null;
+ }
+
+ if (processor.isRowKeyPrefixComparison()) {
+ return createRowKeyPrefixScanSpec(call, processor);
+ }
+
+ CompareOp compareOp = null;
+ boolean isNullTest = false;
+ ByteArrayComparable comparator = new BinaryComparator(fieldValue);
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+ switch (functionName) {
+ case "equal":
+ compareOp = CompareOp.EQUAL;
+ if (isRowKey) {
+ startRow = fieldValue;
+ /* stopRow should be just greater than 'value'*/
+ stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ compareOp = CompareOp.EQUAL;
+ }
+ break;
+ case "not_equal":
+ compareOp = CompareOp.NOT_EQUAL;
+ break;
+ case "greater_than_or_equal_to":
+ if (sortOrderAscending) {
+ compareOp = CompareOp.GREATER_OR_EQUAL;
+ if (isRowKey) {
+ startRow = fieldValue;
+ }
+ } else {
+ compareOp = CompareOp.LESS_OR_EQUAL;
+ if (isRowKey) {
+ // stopRow should be just greater than 'value'
+ stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ }
+ }
+ break;
+ case "greater_than":
+ if (sortOrderAscending) {
+ compareOp = CompareOp.GREATER;
+ if (isRowKey) {
+ // startRow should be just greater than 'value'
+ startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ }
+ } else {
+ compareOp = CompareOp.LESS;
+ if (isRowKey) {
+ stopRow = fieldValue;
+ }
+ }
+ break;
+ case "less_than_or_equal_to":
+ if (sortOrderAscending) {
+ compareOp = CompareOp.LESS_OR_EQUAL;
+ if (isRowKey) {
+ // stopRow should be just greater than 'value'
+ stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ }
+ } else {
+ compareOp = CompareOp.GREATER_OR_EQUAL;
+ if (isRowKey) {
+ startRow = fieldValue;
+ }
+ }
+ break;
+ case "less_than":
+ if (sortOrderAscending) {
+ compareOp = CompareOp.LESS;
+ if (isRowKey) {
+ stopRow = fieldValue;
+ }
+ } else {
+ compareOp = CompareOp.GREATER;
+ if (isRowKey) {
+ // startRow should be just greater than 'value'
+ startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ }
+ }
+ break;
+ case "isnull":
+ case "isNull":
+ case "is null":
+ if (isRowKey) {
+ return null;
+ }
+ isNullTest = true;
+ compareOp = CompareOp.EQUAL;
+ comparator = new NullComparator();
+ break;
+ case "isnotnull":
+ case "isNotNull":
+ case "is not null":
+ if (isRowKey) {
+ return null;
+ }
+ compareOp = CompareOp.NOT_EQUAL;
+ comparator = new NullComparator();
+ break;
+ case "like":
+ /*
+ * Convert the LIKE operand to Regular Expression pattern so that we can
+ * apply RegexStringComparator()
+ */
+ HBaseRegexParser parser = new HBaseRegexParser(call).parse();
+ compareOp = CompareOp.EQUAL;
+ comparator = new RegexStringComparator(parser.getRegexString());
+
+ /*
+ * We can possibly do better if the LIKE operator is on the row_key
+ */
+ if (isRowKey) {
+ String prefix = parser.getPrefixString();
+ if (prefix != null) { // group 3 is literal
+ /*
+ * If there is a literal prefix, it can help us prune the scan to a sub range
+ */
+ if (prefix.equals(parser.getLikeString())) {
+ /* The operand value is literal. This turns the LIKE operator to EQUAL operator */
+ startRow = stopRow = fieldValue;
+ compareOp = null;
+ } else {
+ startRow = prefix.getBytes(Charsets.UTF_8);
+ stopRow = startRow.clone();
+ boolean isMaxVal = true;
+ for (int i = stopRow.length - 1; i >= 0 ; --i) {
+ int nextByteValue = (0xff & stopRow[i]) + 1;
+ if (nextByteValue < 0xff) {
+ stopRow[i] = (byte) nextByteValue;
+ isMaxVal = false;
+ break;
+ } else {
+ stopRow[i] = 0;
+ }
+ }
+ if (isMaxVal) {
+ stopRow = HConstants.EMPTY_END_ROW;
+ }
+ }
+ }
+ }
+ break;
+ }
+
+ if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
+ Filter filter = null;
+ if (isRowKey) {
+ if (compareOp != null) {
+ filter = new RowFilter(compareOp, comparator);
+ }
+ } else {
+ byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
+ byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
+ filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
+ ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
+ if (!isNullTest) {
+ ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
+ }
+ }
+ return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
+ }
+ // else
+ return null;
+ }
+
+ private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
+ CompareFunctionsProcessor processor) {
+ byte[] startRow = processor.getRowKeyPrefixStartRow();
+ byte[] stopRow = processor.getRowKeyPrefixStopRow();
+ Filter filter = processor.getRowKeyPrefixFilter();
+
+ if (startRow != HConstants.EMPTY_START_ROW ||
+ stopRow != HConstants.EMPTY_END_ROW ||
+ filter != null) {
+ return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
+ }
+
+ // else
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
new file mode 100644
index 0000000..827fa8c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression;
+import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.joda.time.LocalTime;
+import org.ojai.Value;
+import org.ojai.types.ODate;
+import org.ojai.types.OTime;
+import org.ojai.types.OTimestamp;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.mapr.db.rowcol.KeyValueBuilder;
+
+class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+
+ private String functionName;
+ private Boolean success;
+ private Value value;
+ private SchemaPath path;
+
+ public CompareFunctionsProcessor(String functionName) {
+ this.functionName = functionName;
+ this.success = false;
+ this.value = null;
+ }
+
+ public static boolean isCompareFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+ return false;
+ }
+
+ public static CompareFunctionsProcessor process(FunctionCall call) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : null;
+ CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+
+ //if (valueArg != null) {
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ //}
+
+ return evaluator;
+ }
+
+ public boolean isSuccess() {
+ // TODO Auto-generated method stub
+ return success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ public Value getValue() {
+ return value;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+ // If valueArg is null, this might be a IS NULL/IS NOT NULL type of query
+ if (valueArg == null) {
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof QuotedString) {
+ this.value = KeyValueBuilder.initFrom(((QuotedString) valueArg).value);
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof IntExpression) {
+ this.value = KeyValueBuilder.initFrom(((IntExpression)valueArg).getInt());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof FloatExpression) {
+ this.value = KeyValueBuilder.initFrom(((FloatExpression)valueArg).getFloat());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof BooleanExpression) {
+ this.value = KeyValueBuilder.initFrom(((BooleanExpression)valueArg).getBoolean());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof Decimal28Expression) {
+ this.value = KeyValueBuilder.initFrom(((Decimal28Expression)valueArg).getBigDecimal());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof Decimal38Expression) {
+ this.value = KeyValueBuilder.initFrom(((Decimal38Expression)valueArg).getBigDecimal());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof DoubleExpression) {
+ this.value = KeyValueBuilder.initFrom(((DoubleExpression)valueArg).getDouble());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof LongExpression) {
+ this.value = KeyValueBuilder.initFrom(((LongExpression)valueArg).getLong());
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof DateExpression) {
+ long d = ((DateExpression)valueArg).getDate();
+ final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
+ int daysSinceEpoch = (int)(d / MILLISECONDS_IN_A_DAY);
+ this.value = KeyValueBuilder.initFrom(ODate.fromDaysSinceEpoch(daysSinceEpoch));
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof TimeExpression) {
+ int t = ((TimeExpression)valueArg).getTime();
+ LocalTime lT = LocalTime.fromMillisOfDay(t);
+ this.value = KeyValueBuilder.initFrom(new OTime(lT.getHourOfDay(), lT.getMinuteOfHour(), lT.getSecondOfMinute(), lT.getMillisOfSecond()));
+ this.path = path;
+ return true;
+ }
+
+ if (valueArg instanceof TimeStampExpression) {
+ this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp()));
+ this.path = path;
+ return true;
+ }
+
+ return false;
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(BooleanExpression.class)
+ .add(DateExpression.class)
+ .add(DoubleExpression.class)
+ .add(FloatExpression.class)
+ .add(IntExpression.class)
+ .add(LongExpression.class)
+ .add(QuotedString.class)
+ .add(TimeExpression.class)
+ .build();
+ }
+
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // unary functions
+ .put("isnotnull", "isnotnull")
+ .put("isNotNull", "isNotNull")
+ .put("is not null", "is not null")
+ .put("isnull", "isnull")
+ .put("isNull", "isNull")
+ .put("is null", "is null")
+ // binary functions
+ .put("like", "like")
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
new file mode 100644
index 0000000..16802ad
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.ojai.Value;
+import org.ojai.store.QueryCondition;
+import org.ojai.store.QueryCondition.Op;
+
+import com.google.common.collect.ImmutableList;
+import com.mapr.db.MapRDB;
+
+public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
+
+ final private JsonTableGroupScan groupScan;
+
+ final private LogicalExpression le;
+
+ private boolean allExpressionsConverted = true;
+
+ public JsonConditionBuilder(JsonTableGroupScan groupScan,
+ LogicalExpression conditionExp) {
+ this.groupScan = groupScan;
+ this.le = conditionExp;
+ }
+
+ public JsonScanSpec parseTree() {
+ JsonScanSpec parsedSpec = le.accept(this, null);
+ if (parsedSpec != null) {
+ parsedSpec.mergeScanSpec("booleanAnd", this.groupScan.getScanSpec());
+ }
+ return parsedSpec;
+ }
+
+ public boolean isAllExpressionsConverted() {
+ // TODO Auto-generated method stub
+ return allExpressionsConverted;
+ }
+
+ @Override
+ public JsonScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
+ }
+
+ @Override
+ public JsonScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
+ return visitFunctionCall(op, value);
+ }
+
+ @Override
+ public JsonScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+ JsonScanSpec nodeScanSpec = null;
+ String functionName = call.getName();
+ ImmutableList<LogicalExpression> args = call.args;
+
+ if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
+ CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call);
+ if (processor.isSuccess()) {
+ nodeScanSpec = createJsonScanSpec(call, processor);
+ }
+ } else {
+ switch(functionName) {
+ case "booleanAnd":
+ case "booleanOr":
+ nodeScanSpec = args.get(0).accept(this, null);
+ for (int i = 1; i < args.size(); ++i) {
+ JsonScanSpec nextScanSpec = args.get(i).accept(this, null);
+ if (nodeScanSpec != null && nextScanSpec != null) {
+ nodeScanSpec.mergeScanSpec(functionName, nextScanSpec);
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
+ nodeScanSpec = nodeScanSpec == null ? nextScanSpec : nodeScanSpec;
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ if (nodeScanSpec == null) {
+ allExpressionsConverted = false;
+ }
+
+ return nodeScanSpec;
+ }
+
+ private void setIsCondition(QueryCondition c,
+ String str,
+ QueryCondition.Op op,
+ Value v) {
+ switch (v.getType()) {
+ case BOOLEAN:
+ c.is(str, op, v.getBoolean());
+ break;
+ case STRING:
+ c.is(str, op, v.getString());
+ break;
+ case BYTE:
+ c.is(str, op, v.getByte());
+ break;
+ case SHORT:
+ c.is(str, op, v.getShort());
+ break;
+ case INT:
+ c.is(str, op, v.getInt());
+ break;
+ case LONG:
+ c.is(str, op, v.getLong());
+ break;
+ case FLOAT:
+ c.is(str, op, v.getFloat());
+ break;
+ case DOUBLE:
+ c.is(str, op, v.getDouble());
+ break;
+ case DECIMAL:
+ c.is(str, op, v.getDecimal());
+ break;
+ case DATE:
+ c.is(str, op, v.getDate());
+ break;
+ case TIME:
+ c.is(str, op, v.getTime());
+ break;
+ case TIMESTAMP:
+ c.is(str, op, v.getTimestamp());
+ break;
+ case BINARY:
+ c.is(str, op, v.getBinary());
+ break;
+ // XXX/TODO: Map, Array?
+ default:
+ break;
+ }
+ }
+
+ private JsonScanSpec createJsonScanSpec(FunctionCall call,
+ CompareFunctionsProcessor processor) {
+ String functionName = processor.getFunctionName();
+ SchemaPath field = processor.getPath();
+ Value fieldValue = processor.getValue();
+
+ QueryCondition cond = null;
+ switch (functionName) {
+ case "equal":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.EQUAL, fieldValue);
+ cond.build();
+ break;
+
+ case "not_equal":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.NOT_EQUAL, fieldValue);
+ cond.build();
+ break;
+
+ case "less_than":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.LESS, fieldValue);
+ cond.build();
+ break;
+
+ case "less_than_or_equal_to":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.LESS_OR_EQUAL, fieldValue);
+ cond.build();
+ break;
+
+ case "greater_than":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.GREATER, fieldValue);
+ cond.build();
+ break;
+
+ case "greater_than_or_equal_to":
+ cond = MapRDB.newCondition();
+ setIsCondition(cond, field.getAsUnescapedPath(), Op.GREATER_OR_EQUAL, fieldValue);
+ cond.build();
+ break;
+
+ case "isnull":
+ cond = MapRDB.newCondition().notExists(field.getAsUnescapedPath()).build();
+ break;
+
+ case "isnotnull":
+ cond = MapRDB.newCondition().exists(field.getAsUnescapedPath()).build();
+ break;
+
+ case "istrue":
+ cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.EQUAL, true).build();
+ break;
+
+ case "isnotfalse":
+ cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.NOT_EQUAL, false).build();
+ break;
+
+ case "isfalse":
+ cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.EQUAL, false).build();
+ break;
+
+ case "isnottrue":
+ cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.NOT_EQUAL, true).build();
+ break;
+
+ case "like":
+ cond = MapRDB.newCondition().like(field.getAsUnescapedPath(), fieldValue.getString()).build();
+ break;
+
+ default:
+ }
+
+ if (cond != null) {
+ return new JsonScanSpec(groupScan.getTableName(), cond);
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
new file mode 100644
index 0000000..f316eeb
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonScanSpec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.ojai.store.QueryCondition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mapr.db.MapRDB;
+import com.mapr.db.impl.ConditionImpl;
+
+public class JsonScanSpec {
+ protected String tableName;
+ protected QueryCondition condition;
+
+ @JsonCreator
+ public JsonScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("condition") QueryCondition condition) {
+ this.tableName = tableName;
+ this.condition = condition;
+ }
+
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public byte[] getStartRow() {
+ if (condition == null) {
+ return HConstants.EMPTY_START_ROW;
+ }
+ return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStartRow();
+ }
+
+ public byte[] getStopRow() {
+ if (condition == null) {
+ return HConstants.EMPTY_END_ROW;
+ }
+
+ return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStopRow();
+ }
+
+ public Object getSerializedFilter() {
+ if (this.condition != null) {
+ return ((ConditionImpl)this.condition).getDescriptor().getSerialized();
+ }
+
+ return null;
+ }
+
+ public void setCondition(QueryCondition condition) {
+ this.condition = condition;
+ }
+
+ @JsonIgnore
+ public QueryCondition getCondition() {
+ return this.condition;
+ }
+
+ public void mergeScanSpec(String functionName, JsonScanSpec scanSpec) {
+
+ if (this.condition != null && scanSpec.getCondition() != null) {
+ QueryCondition newCond = MapRDB.newCondition();
+ switch (functionName) {
+ case "booleanAnd":
+ newCond.and();
+ break;
+ case "booleanOr":
+ newCond.or();
+ break;
+ default:
+ assert(false);
+ }
+
+ newCond.condition(this.condition)
+ .condition(scanSpec.getCondition())
+ .close()
+ .build();
+
+ this.condition = newCond;
+ } else if (scanSpec.getCondition() != null){
+ this.condition = scanSpec.getCondition();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JsonScanSpec [tableName=" + tableName
+ + ", condition=" + (condition == null ? null : condition.toString())
+ + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
new file mode 100644
index 0000000..3e5dce7
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonSubScanSpec.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.hadoop.hbase.HConstants;
+import org.ojai.DocumentConstants;
+import org.ojai.Value;
+import org.ojai.store.QueryCondition;
+import org.ojai.store.QueryCondition.Op;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mapr.db.MapRDB;
+import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.IdCodec;
+
+public class JsonSubScanSpec extends MapRDBSubScanSpec {
+
+ protected QueryCondition condition;
+
+ @JsonCreator
+ public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("regionServer") String regionServer,
+ @JsonProperty("startRow") byte[] startRow,
+ @JsonProperty("stopRow") byte[] stopRow,
+ @JsonProperty("cond") QueryCondition cond) {
+ super(tableName, regionServer, null, null, null, null);
+
+ this.condition = MapRDB.newCondition().and();
+
+ if (cond != null) {
+ this.condition.condition(cond);
+ }
+
+ if (startRow != null &&
+ Arrays.equals(startRow, HConstants.EMPTY_START_ROW) == false) {
+ Value startVal = IdCodec.decode(startRow);
+
+ switch(startVal.getType()) {
+ case BINARY:
+ this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getBinary());
+ break;
+ case STRING:
+ this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getString());
+ break;
+ default:
+ throw new IllegalStateException("Encountered an unsupported type " + startVal.getType()
+ + " for _id");
+ }
+ }
+
+ if (stopRow != null &&
+ Arrays.equals(stopRow, HConstants.EMPTY_END_ROW) == false) {
+ Value stopVal = IdCodec.decode(stopRow);
+
+ switch(stopVal.getType()) {
+ case BINARY:
+ this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getBinary());
+ break;
+ case STRING:
+ this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getString());
+ break;
+ default:
+ throw new IllegalStateException("Encountered an unsupported type " + stopVal.getType()
+ + " for _id");
+ }
+ }
+
+ this.condition.close().build();
+ }
+
+ public void setCondition(QueryCondition cond) {
+ condition = cond;
+ }
+
+ @JsonIgnore
+ public QueryCondition getCondition() {
+ return this.condition;
+ }
+
+ @Override
+ public byte[] getSerializedFilter() {
+ if (this.condition != null) {
+ ByteBuffer bbuf = ((ConditionImpl)this.condition).getDescriptor().getSerialized();
+ byte[] serFilter = new byte[bbuf.limit() - bbuf.position()];
+ bbuf.get(serFilter);
+ return serFilter;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
new file mode 100644
index 0000000..9e23af7
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
+import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.mapr.db.MapRDB;
+import com.mapr.db.Table;
+import com.mapr.db.TabletInfo;
+import com.mapr.db.impl.TabletInfoImpl;
+
+@JsonTypeName("maprdb-json-scan")
+public class JsonTableGroupScan extends MapRDBGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableGroupScan.class);
+
+ public static final String TABLE_JSON = "json";
+
+ private MapRDBTableStats tableStats;
+
+ private JsonScanSpec scanSpec;
+
+ @JsonCreator
+ public JsonTableGroupScan(@JsonProperty("userName") final String userName,
+ @JsonProperty("scanSpec") JsonScanSpec scanSpec,
+ @JsonProperty("storage") FileSystemConfig storagePluginConfig,
+ @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+ this (userName,
+ (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+ (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ scanSpec, columns);
+ }
+
+ public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+ MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) {
+ super(storagePlugin, formatPlugin, columns, userName);
+ this.scanSpec = scanSpec;
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ * @param that The HBaseGroupScan to clone
+ */
+ private JsonTableGroupScan(JsonTableGroupScan that) {
+ super(that);
+ this.scanSpec = that.scanSpec;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
+ this.tableStats = that.tableStats;
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ JsonTableGroupScan newScan = new JsonTableGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
+ private void init() {
+ logger.debug("Getting tablet locations");
+ try {
+ Configuration conf = new Configuration();
+ Table t = MapRDB.getTable(scanSpec.getTableName());
+ TabletInfo[] tabletInfos = t.getTabletInfos(scanSpec.getCondition());
+ tableStats = new MapRDBTableStats(conf, scanSpec.getTableName());
+
+ boolean foundStartRegion = false;
+ regionsToScan = new TreeMap<TabletFragmentInfo, String>();
+ for (TabletInfo tabletInfo : tabletInfos) {
+ TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
+ if (!foundStartRegion
+ && !isNullOrEmpty(scanSpec.getStartRow())
+ && !tabletInfoImpl.containsRow(scanSpec.getStartRow())) {
+ continue;
+ }
+ foundStartRegion = true;
+ regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), tabletInfo.getLocations()[0]);
+ if (!isNullOrEmpty(scanSpec.getStopRow())
+ && tabletInfoImpl.containsRow(scanSpec.getStopRow())) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error getting region info for table: " + scanSpec.getTableName(), e);
+ }
+ }
+
+ protected JsonSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+ // XXX/TODO check filter/Condition
+ JsonScanSpec spec = scanSpec;
+ JsonSubScanSpec subScanSpec = new JsonSubScanSpec(
+ spec.getTableName(),
+ regionsToScan.get(tfi),
+ (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
+ (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
+ spec.getCondition());
+ return subScanSpec;
+ }
+
+ @Override
+ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+ minorFragmentId);
+ return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
+ endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ //TODO: look at stats for this.
+ long rowCount = (long) ((scanSpec.getSerializedFilter() != null ? .5 : 1) * tableStats.getNumRows());
+ int avgColumnSize = 10;
+ int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new JsonTableGroupScan(this);
+ }
+
+ @JsonIgnore
+ public String getTableName() {
+ return scanSpec.getTableName();
+ }
+
+ @Override
+ public String toString() {
+ return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + "]";
+ }
+
+ public JsonScanSpec getScanSpec() {
+ return scanSpec;
+ }
+
+}