You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:23 UTC
[36/50] [abbrv] drill git commit: Refactoring code for better
organization.
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
deleted file mode 100644
index 59d0d01..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.binary;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin;
-import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig;
-import org.apache.drill.exec.store.maprdb.MapRDBGroupScan;
-import org.apache.drill.exec.store.maprdb.MapRDBSubScan;
-import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
-import org.apache.drill.exec.store.maprdb.MapRDBTableStats;
-import org.apache.drill.exec.store.maprdb.TabletFragmentInfo;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.codehaus.jackson.annotate.JsonCreator;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("maprdb-binary-scan")
-public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class);
-
- public static final String TABLE_BINARY = "binary";
-
- private HBaseScanSpec hbaseScanSpec;
-
- private HTableDescriptor hTableDesc;
-
- private MapRDBTableStats tableStats;
-
- @JsonCreator
- public BinaryTableGroupScan(@JsonProperty("userName") final String userName,
- @JsonProperty("hbaseScanSpec") HBaseScanSpec scanSpec,
- @JsonProperty("storage") FileSystemConfig storagePluginConfig,
- @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName,
- (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
- scanSpec, columns);
- }
-
- public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin,
- MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
- super(storagePlugin, formatPlugin, columns, userName);
- this.hbaseScanSpec = scanSpec;
- init();
- }
-
- /**
- * Private constructor, used for cloning.
- * @param that The HBaseGroupScan to clone
- */
- private BinaryTableGroupScan(BinaryTableGroupScan that) {
- super(that);
- this.hbaseScanSpec = that.hbaseScanSpec;
- this.endpointFragmentMapping = that.endpointFragmentMapping;
- this.hTableDesc = that.hTableDesc;
- this.tableStats = that.tableStats;
- }
-
- @Override
- public GroupScan clone(List<SchemaPath> columns) {
- BinaryTableGroupScan newScan = new BinaryTableGroupScan(this);
- newScan.columns = columns;
- newScan.verifyColumns();
- return newScan;
- }
-
- private void init() {
- logger.debug("Getting region locations");
- try {
- Configuration conf = HBaseConfiguration.create();
- HTable table = new HTable(conf, hbaseScanSpec.getTableName());
- tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName());
- this.hTableDesc = table.getTableDescriptor();
- NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
- table.close();
-
- boolean foundStartRegion = false;
- regionsToScan = new TreeMap<TabletFragmentInfo, String>();
- for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
- HRegionInfo regionInfo = mapEntry.getKey();
- if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
- continue;
- }
- foundStartRegion = true;
- regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname());
- if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
- break;
- }
- }
- } catch (Exception e) {
- throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
- }
- verifyColumns();
- }
-
- private void verifyColumns() {
- /*
- if (columns != null) {
- for (SchemaPath column : columns) {
- if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
- DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
- column.getRootSegment().getPath(), hTableDesc.getNameAsString());
- }
- }
- }
- */
- }
-
- protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
- HBaseScanSpec spec = hbaseScanSpec;
- MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec(
- spec.getTableName(),
- regionsToScan.get(tfi),
- (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
- (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
- spec.getSerializedFilter(),
- null);
- return subScanSpec;
- }
-
- private boolean isNullOrEmpty(byte[] key) {
- return key == null || key.length == 0;
- }
-
- @Override
- public MapRDBSubScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < endpointFragmentMapping.size() : String.format(
- "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
- minorFragmentId);
- return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
- endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
- }
-
- @Override
- public ScanStats getScanStats() {
- //TODO: look at stats for this.
- long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows());
- int avgColumnSize = 10;
- int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new BinaryTableGroupScan(this);
- }
-
- @JsonIgnore
- public Configuration getHBaseConf() {
- return HBaseConfiguration.create();
- }
-
- @JsonIgnore
- public String getTableName() {
- return getHBaseScanSpec().getTableName();
- }
-
- @Override
- public String toString() {
- return "BinaryTableGroupScan [ScanSpec="
- + hbaseScanSpec + ", columns="
- + columns + "]";
- }
-
- @JsonProperty
- public HBaseScanSpec getHBaseScanSpec() {
- return hbaseScanSpec;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
deleted file mode 100644
index f06786d..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.binary;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.drill.common.expression.CastExpression;
-import org.apache.drill.common.expression.ConvertExpression;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DateExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.hadoop.hbase.util.Order;
-import org.apache.hadoop.hbase.util.PositionedByteRange;
-import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
-
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
- private byte[] value;
- private boolean success;
- private boolean isEqualityFn;
- private SchemaPath path;
- private String functionName;
- private boolean sortOrderAscending;
-
- // Fields for row-key prefix comparison
- // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter
- // Hence, we use these local variables(set depending upon the encoding type in user query)
- private boolean isRowKeyPrefixComparison;
- byte[] rowKeyPrefixStartRow;
- byte[] rowKeyPrefixStopRow;
- Filter rowKeyPrefixFilter;
-
- public static boolean isCompareFunction(String functionName) {
- return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
- }
-
- public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
- String functionName = call.getName();
- LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
- CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
-
- if (valueArg != null) { // binary function
- if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
- LogicalExpression swapArg = valueArg;
- valueArg = nameArg;
- nameArg = swapArg;
- evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
- }
- evaluator.success = nameArg.accept(evaluator, valueArg);
- } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
- evaluator.success = true;
- evaluator.path = (SchemaPath) nameArg;
- }
-
- return evaluator;
- }
-
- public CompareFunctionsProcessor(String functionName) {
- this.success = false;
- this.functionName = functionName;
- this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
- && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
- this.isRowKeyPrefixComparison = false;
- this.sortOrderAscending = true;
- }
-
- public byte[] getValue() {
- return value;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public SchemaPath getPath() {
- return path;
- }
-
- public String getFunctionName() {
- return functionName;
- }
-
- public boolean isRowKeyPrefixComparison() {
- return isRowKeyPrefixComparison;
- }
-
- public byte[] getRowKeyPrefixStartRow() {
- return rowKeyPrefixStartRow;
- }
-
- public byte[] getRowKeyPrefixStopRow() {
- return rowKeyPrefixStopRow;
- }
-
- public Filter getRowKeyPrefixFilter() {
- return rowKeyPrefixFilter;
- }
-
- public boolean isSortOrderAscending() {
- return sortOrderAscending;
- }
-
- @Override
- public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
- if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
- return e.getInput().accept(this, valueArg);
- }
- return false;
- }
-
- @Override
- public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
- if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
-
- String encodingType = e.getEncodingType();
- int prefixLength = 0;
-
- // Handle scan pruning in the following scenario:
- // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is
- // querying for the first few bytes of the row-key(start-offset 1)
- // Example WHERE clause:
- // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17'
- if (e.getInput() instanceof FunctionCall) {
-
- // We can prune scan range only for big-endian encoded data
- if (encodingType.endsWith("_BE") == false) {
- return false;
- }
-
- FunctionCall call = (FunctionCall)e.getInput();
- String functionName = call.getName();
- if (!functionName.equalsIgnoreCase("byte_substr")) {
- return false;
- }
-
- LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null;
- LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null;
-
- if (((nameArg instanceof SchemaPath) == false) ||
- (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) ||
- (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) {
- return false;
- }
-
- boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
- int offset = ((IntExpression)valueArg1).getInt();
-
- if (!isRowKey || (offset != 1)) {
- return false;
- }
-
- this.path = (SchemaPath)nameArg;
- prefixLength = ((IntExpression)valueArg2).getInt();
- this.isRowKeyPrefixComparison = true;
- return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
- }
-
- if (e.getInput() instanceof SchemaPath) {
- ByteBuf bb = null;
-
- switch (encodingType) {
- case "INT_BE":
- case "INT":
- case "UINT_BE":
- case "UINT":
- case "UINT4_BE":
- case "UINT4":
- if (valueArg instanceof IntExpression
- && (isEqualityFn || encodingType.startsWith("U"))) {
- bb = newByteBuf(4, encodingType.endsWith("_BE"));
- bb.writeInt(((IntExpression)valueArg).getInt());
- }
- break;
- case "BIGINT_BE":
- case "BIGINT":
- case "UINT8_BE":
- case "UINT8":
- if (valueArg instanceof LongExpression
- && (isEqualityFn || encodingType.startsWith("U"))) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((LongExpression)valueArg).getLong());
- }
- break;
- case "FLOAT":
- if (valueArg instanceof FloatExpression && isEqualityFn) {
- bb = newByteBuf(4, true);
- bb.writeFloat(((FloatExpression)valueArg).getFloat());
- }
- break;
- case "DOUBLE":
- if (valueArg instanceof DoubleExpression && isEqualityFn) {
- bb = newByteBuf(8, true);
- bb.writeDouble(((DoubleExpression)valueArg).getDouble());
- }
- break;
- case "TIME_EPOCH":
- case "TIME_EPOCH_BE":
- if (valueArg instanceof TimeExpression) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((TimeExpression)valueArg).getTime());
- }
- break;
- case "DATE_EPOCH":
- case "DATE_EPOCH_BE":
- if (valueArg instanceof DateExpression) {
- bb = newByteBuf(8, encodingType.endsWith("_BE"));
- bb.writeLong(((DateExpression)valueArg).getDate());
- }
- break;
- case "BOOLEAN_BYTE":
- if (valueArg instanceof BooleanExpression) {
- bb = newByteBuf(1, false /* does not matter */);
- bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
- }
- break;
- case "DOUBLE_OB":
- case "DOUBLE_OBD":
- if (valueArg instanceof DoubleExpression) {
- bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
- ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
- ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
- }
- }
- break;
- case "FLOAT_OB":
- case "FLOAT_OBD":
- if (valueArg instanceof FloatExpression) {
- bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
- ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
- ((FloatExpression)valueArg).getFloat(), Order.ASCENDING);
- }
- }
- break;
- case "BIGINT_OB":
- case "BIGINT_OBD":
- if (valueArg instanceof LongExpression) {
- bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
- ((LongExpression)valueArg).getLong(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
- ((LongExpression)valueArg).getLong(), Order.ASCENDING);
- }
- }
- break;
- case "INT_OB":
- case "INT_OBD":
- if (valueArg instanceof IntExpression) {
- bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
- ((IntExpression)valueArg).getInt(), Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
- ((IntExpression)valueArg).getInt(), Order.ASCENDING);
- }
- }
- break;
- case "UTF8_OB":
- case "UTF8_OBD":
- if (valueArg instanceof QuotedString) {
- int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
- bb = newByteBuf(stringLen + 2, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
- if (encodingType.endsWith("_OBD")) {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
- ((QuotedString)valueArg).value, Order.DESCENDING);
- this.sortOrderAscending = false;
- } else {
- org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
- ((QuotedString)valueArg).value, Order.ASCENDING);
- }
- }
- break;
- case "UTF8":
- // let visitSchemaPath() handle this.
- return e.getInput().accept(this, valueArg);
- }
-
- if (bb != null) {
- this.value = bb.array();
- this.path = (SchemaPath)e.getInput();
- return true;
- }
- }
- }
- return false;
- }
-
- private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
- int prefixLength, LogicalExpression valueArg) {
- String encodingType = e.getEncodingType();
- rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
- rowKeyPrefixStopRow = HConstants.EMPTY_START_ROW;
- rowKeyPrefixFilter = null;
-
- if ((encodingType.compareTo("UINT4_BE") == 0) ||
- (encodingType.compareTo("UINT_BE") == 0)) {
- if (prefixLength != 4) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- int val;
- if ((valueArg instanceof IntExpression) == false) {
- return false;
- }
-
- val = ((IntExpression)valueArg).getInt();
-
- // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "greater_than_or_equal_to":
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
- return true;
- case "greater_than":
- rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "less_than_or_equal_to":
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
- return true;
- case "less_than":
- rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
- return true;
- }
-
- return false;
- }
-
- if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
- (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
- (encodingType.compareTo("UINT8_BE") == 0)) {
-
- if (prefixLength != 8) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- long val;
- if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
- if ((valueArg instanceof TimeExpression) == false) {
- return false;
- }
-
- val = ((TimeExpression)valueArg).getTime();
- } else if (encodingType.compareTo("UINT8_BE") == 0){
- if ((valueArg instanceof LongExpression) == false) {
- return false;
- }
-
- val = ((LongExpression)valueArg).getLong();
- } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
- if ((valueArg instanceof TimeStampExpression) == false) {
- return false;
- }
-
- val = ((TimeStampExpression)valueArg).getTimeStamp();
- } else {
- // Should not reach here.
- return false;
- }
-
- // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "greater_than_or_equal_to":
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
- return true;
- case "greater_than":
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "less_than_or_equal_to":
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
- return true;
- case "less_than":
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
- return true;
- }
-
- return false;
- }
-
- if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
- if ((valueArg instanceof DateExpression) == false) {
- return false;
- }
-
- if (prefixLength != 8) {
- throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
- }
-
- final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
- long dateToSet;
- // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >=
- switch (functionName) {
- case "equal":
- long startDate = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array();
- long stopDate = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
- return true;
- case "greater_than_or_equal_to":
- dateToSet = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "greater_than":
- dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "less_than_or_equal_to":
- dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- case "less_than":
- dateToSet = ((DateExpression)valueArg).getDate();
- rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
- return true;
- }
-
- return false;
- }
-
- return false;
- }
-
- @Override
- public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
- return false;
- }
-
- @Override
- public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
- if (valueArg instanceof QuotedString) {
- this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8);
- this.path = path;
- return true;
- }
- return false;
- }
-
- private static ByteBuf newByteBuf(int size, boolean bigEndian) {
- return Unpooled.wrappedBuffer(new byte[size])
- .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
- .writerIndex(0);
- }
-
- private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
- static {
- ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
- VALUE_EXPRESSION_CLASSES = builder
- .add(BooleanExpression.class)
- .add(DateExpression.class)
- .add(DoubleExpression.class)
- .add(FloatExpression.class)
- .add(IntExpression.class)
- .add(LongExpression.class)
- .add(QuotedString.class)
- .add(TimeExpression.class)
- .build();
- }
-
- private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
- static {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
- // unary functions
- .put("isnotnull", "isnotnull")
- .put("isNotNull", "isNotNull")
- .put("is not null", "is not null")
- .put("isnull", "isnull")
- .put("isNull", "isNull")
- .put("is null", "is null")
- // binary functions
- .put("like", "like")
- .put("equal", "equal")
- .put("not_equal", "not_equal")
- .put("greater_than_or_equal_to", "less_than_or_equal_to")
- .put("greater_than", "less_than")
- .put("less_than_or_equal_to", "greater_than_or_equal_to")
- .put("less_than", "greater_than")
- .build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
deleted file mode 100644
index c4de6bb..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.binary;
-
-import java.util.Arrays;
-
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseRegexParser;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.hbase.HBaseUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.NullComparator;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-
-public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
-
- final private BinaryTableGroupScan groupScan;
-
- final private LogicalExpression le;
-
- private boolean allExpressionsConverted = true;
-
- private static Boolean nullComparatorSupported = null;
-
- public MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) {
- this.groupScan = groupScan;
- this.le = le;
- }
-
- public HBaseScanSpec parseTree() {
- HBaseScanSpec parsedSpec = le.accept(this, null);
- if (parsedSpec != null) {
- parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
- /*
- * If RowFilter is THE filter attached to the scan specification,
- * remove it since its effect is also achieved through startRow and stopRow.
- */
- Filter filter = parsedSpec.getFilter();
- if (filter instanceof RowFilter &&
- ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL &&
- ((RowFilter)filter).getComparator() instanceof BinaryComparator) {
- parsedSpec = new HBaseScanSpec(parsedSpec.getTableName(), parsedSpec.getStartRow(), parsedSpec.getStopRow(), null);
- }
- }
- return parsedSpec;
- }
-
- public boolean isAllExpressionsConverted() {
- return allExpressionsConverted;
- }
-
- @Override
- public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
- allExpressionsConverted = false;
- return null;
- }
-
- @Override
- public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
- return visitFunctionCall(op, value);
- }
-
- @Override
- public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
- HBaseScanSpec nodeScanSpec = null;
- String functionName = call.getName();
- ImmutableList<LogicalExpression> args = call.args;
-
- if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
- /*
- * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
- * causes a filter with NullComparator to fail. Enable only if specified in
- * the configuration (after ensuring that the HBase cluster has the fix).
- */
- if (nullComparatorSupported == null) {
- nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
- }
-
- CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
- if (processor.isSuccess()) {
- nodeScanSpec = createHBaseScanSpec(call, processor);
- }
- } else {
- switch (functionName) {
- case "booleanAnd":
- case "booleanOr":
- HBaseScanSpec firstScanSpec = args.get(0).accept(this, null);
- for (int i = 1; i < args.size(); ++i) {
- HBaseScanSpec nextScanSpec = args.get(i).accept(this, null);
- if (firstScanSpec != null && nextScanSpec != null) {
- nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
- } else {
- allExpressionsConverted = false;
- if ("booleanAnd".equals(functionName)) {
- nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
- }
- }
- firstScanSpec = nodeScanSpec;
- }
- break;
- }
- }
-
- if (nodeScanSpec == null) {
- allExpressionsConverted = false;
- }
-
- return nodeScanSpec;
- }
-
- private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
- Filter newFilter = null;
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
-
- switch (functionName) {
- case "booleanAnd":
- newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
- startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
- stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
- break;
- case "booleanOr":
- newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
- startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
- stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
- }
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
- }
-
- private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) {
- String functionName = processor.getFunctionName();
- SchemaPath field = processor.getPath();
- byte[] fieldValue = processor.getValue();
- boolean sortOrderAscending = processor.isSortOrderAscending();
- boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
- if (!(isRowKey
- || (!field.getRootSegment().isLastPath()
- && field.getRootSegment().getChild().isLastPath()
- && field.getRootSegment().getChild().isNamed())
- )
- ) {
- /*
- * if the field in this function is neither the row_key nor a qualified HBase column, return.
- */
- return null;
- }
-
- if (processor.isRowKeyPrefixComparison()) {
- return createRowKeyPrefixScanSpec(call, processor);
- }
-
- CompareOp compareOp = null;
- boolean isNullTest = false;
- ByteArrayComparable comparator = new BinaryComparator(fieldValue);
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
- switch (functionName) {
- case "equal":
- compareOp = CompareOp.EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- /* stopRow should be just greater than 'value'*/
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- compareOp = CompareOp.EQUAL;
- }
- break;
- case "not_equal":
- compareOp = CompareOp.NOT_EQUAL;
- break;
- case "greater_than_or_equal_to":
- if (sortOrderAscending) {
- compareOp = CompareOp.GREATER_OR_EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- }
- } else {
- compareOp = CompareOp.LESS_OR_EQUAL;
- if (isRowKey) {
- // stopRow should be just greater than 'value'
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- }
- break;
- case "greater_than":
- if (sortOrderAscending) {
- compareOp = CompareOp.GREATER;
- if (isRowKey) {
- // startRow should be just greater than 'value'
- startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- } else {
- compareOp = CompareOp.LESS;
- if (isRowKey) {
- stopRow = fieldValue;
- }
- }
- break;
- case "less_than_or_equal_to":
- if (sortOrderAscending) {
- compareOp = CompareOp.LESS_OR_EQUAL;
- if (isRowKey) {
- // stopRow should be just greater than 'value'
- stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- } else {
- compareOp = CompareOp.GREATER_OR_EQUAL;
- if (isRowKey) {
- startRow = fieldValue;
- }
- }
- break;
- case "less_than":
- if (sortOrderAscending) {
- compareOp = CompareOp.LESS;
- if (isRowKey) {
- stopRow = fieldValue;
- }
- } else {
- compareOp = CompareOp.GREATER;
- if (isRowKey) {
- // startRow should be just greater than 'value'
- startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
- }
- }
- break;
- case "isnull":
- case "isNull":
- case "is null":
- if (isRowKey) {
- return null;
- }
- isNullTest = true;
- compareOp = CompareOp.EQUAL;
- comparator = new NullComparator();
- break;
- case "isnotnull":
- case "isNotNull":
- case "is not null":
- if (isRowKey) {
- return null;
- }
- compareOp = CompareOp.NOT_EQUAL;
- comparator = new NullComparator();
- break;
- case "like":
- /*
- * Convert the LIKE operand to Regular Expression pattern so that we can
- * apply RegexStringComparator()
- */
- HBaseRegexParser parser = new HBaseRegexParser(call).parse();
- compareOp = CompareOp.EQUAL;
- comparator = new RegexStringComparator(parser.getRegexString());
-
- /*
- * We can possibly do better if the LIKE operator is on the row_key
- */
- if (isRowKey) {
- String prefix = parser.getPrefixString();
- if (prefix != null) { // group 3 is literal
- /*
- * If there is a literal prefix, it can help us prune the scan to a sub range
- */
- if (prefix.equals(parser.getLikeString())) {
- /* The operand value is literal. This turns the LIKE operator to EQUAL operator */
- startRow = stopRow = fieldValue;
- compareOp = null;
- } else {
- startRow = prefix.getBytes(Charsets.UTF_8);
- stopRow = startRow.clone();
- boolean isMaxVal = true;
- for (int i = stopRow.length - 1; i >= 0 ; --i) {
- int nextByteValue = (0xff & stopRow[i]) + 1;
- if (nextByteValue < 0xff) {
- stopRow[i] = (byte) nextByteValue;
- isMaxVal = false;
- break;
- } else {
- stopRow[i] = 0;
- }
- }
- if (isMaxVal) {
- stopRow = HConstants.EMPTY_END_ROW;
- }
- }
- }
- }
- break;
- }
-
- if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
- Filter filter = null;
- if (isRowKey) {
- if (compareOp != null) {
- filter = new RowFilter(compareOp, comparator);
- }
- } else {
- byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
- byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
- filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
- ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
- if (!isNullTest) {
- ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
- }
- }
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
- }
- // else
- return null;
- }
-
- private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
- CompareFunctionsProcessor processor) {
- byte[] startRow = processor.getRowKeyPrefixStartRow();
- byte[] stopRow = processor.getRowKeyPrefixStopRow();
- Filter filter = processor.getRowKeyPrefixFilter();
-
- if (startRow != HConstants.EMPTY_START_ROW ||
- stopRow != HConstants.EMPTY_END_ROW ||
- filter != null) {
- return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
- }
-
- // else
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/CompareFunctionsProcessor.java
deleted file mode 100644
index dc5c2b7..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/CompareFunctionsProcessor.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.json;
-
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DateExpression;
-import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression;
-import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.joda.time.LocalTime;
-import org.ojai.Value;
-import org.ojai.types.ODate;
-import org.ojai.types.OTime;
-import org.ojai.types.OTimestamp;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.mapr.db.rowcol.KeyValueBuilder;
-
-class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
-
- private String functionName;
- private Boolean success;
- private Value value;
- private SchemaPath path;
-
- public CompareFunctionsProcessor(String functionName) {
- this.functionName = functionName;
- this.success = false;
- this.value = null;
- }
-
- public static boolean isCompareFunction(String functionName) {
- return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
- }
-
- @Override
- public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
- return false;
- }
-
- public static CompareFunctionsProcessor process(FunctionCall call) {
- String functionName = call.getName();
- LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : null;
- CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
-
- //if (valueArg != null) {
- if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
- LogicalExpression swapArg = valueArg;
- valueArg = nameArg;
- nameArg = swapArg;
- evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
- }
- evaluator.success = nameArg.accept(evaluator, valueArg);
- //}
-
- return evaluator;
- }
-
- public boolean isSuccess() {
- // TODO Auto-generated method stub
- return success;
- }
-
- public SchemaPath getPath() {
- return path;
- }
-
- public Value getValue() {
- return value;
- }
-
- public String getFunctionName() {
- return functionName;
- }
-
- @Override
- public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
- // If valueArg is null, this might be a IS NULL/IS NOT NULL type of query
- if (valueArg == null) {
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof QuotedString) {
- this.value = KeyValueBuilder.initFrom(((QuotedString) valueArg).value);
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof IntExpression) {
- this.value = KeyValueBuilder.initFrom(((IntExpression)valueArg).getInt());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof FloatExpression) {
- this.value = KeyValueBuilder.initFrom(((FloatExpression)valueArg).getFloat());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof BooleanExpression) {
- this.value = KeyValueBuilder.initFrom(((BooleanExpression)valueArg).getBoolean());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof Decimal28Expression) {
- this.value = KeyValueBuilder.initFrom(((Decimal28Expression)valueArg).getBigDecimal());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof Decimal38Expression) {
- this.value = KeyValueBuilder.initFrom(((Decimal38Expression)valueArg).getBigDecimal());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof DoubleExpression) {
- this.value = KeyValueBuilder.initFrom(((DoubleExpression)valueArg).getDouble());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof LongExpression) {
- this.value = KeyValueBuilder.initFrom(((LongExpression)valueArg).getLong());
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof DateExpression) {
- long d = ((DateExpression)valueArg).getDate();
- final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
- int daysSinceEpoch = (int)(d / MILLISECONDS_IN_A_DAY);
- this.value = KeyValueBuilder.initFrom(ODate.fromDaysSinceEpoch(daysSinceEpoch));
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof TimeExpression) {
- int t = ((TimeExpression)valueArg).getTime();
- LocalTime lT = LocalTime.fromMillisOfDay(t);
- this.value = KeyValueBuilder.initFrom(new OTime(lT.getHourOfDay(), lT.getMinuteOfHour(), lT.getSecondOfMinute(), lT.getMillisOfSecond()));
- this.path = path;
- return true;
- }
-
- if (valueArg instanceof TimeStampExpression) {
- this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp()));
- this.path = path;
- return true;
- }
-
- return false;
- }
-
- private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
- static {
- ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
- VALUE_EXPRESSION_CLASSES = builder
- .add(BooleanExpression.class)
- .add(DateExpression.class)
- .add(DoubleExpression.class)
- .add(FloatExpression.class)
- .add(IntExpression.class)
- .add(LongExpression.class)
- .add(QuotedString.class)
- .add(TimeExpression.class)
- .build();
- }
-
- private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
- static {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
- // unary functions
- .put("isnotnull", "isnotnull")
- .put("isNotNull", "isNotNull")
- .put("is not null", "is not null")
- .put("isnull", "isnull")
- .put("isNull", "isNull")
- .put("is null", "is null")
- // binary functions
- .put("like", "like")
- .put("equal", "equal")
- .put("not_equal", "not_equal")
- .put("greater_than_or_equal_to", "less_than_or_equal_to")
- .put("greater_than", "less_than")
- .put("less_than_or_equal_to", "greater_than_or_equal_to")
- .put("less_than", "greater_than")
- .build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonConditionBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonConditionBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonConditionBuilder.java
deleted file mode 100644
index 889f5a9..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonConditionBuilder.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.json;
-
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.ojai.Value;
-import org.ojai.store.QueryCondition;
-import org.ojai.store.QueryCondition.Op;
-
-import com.google.common.collect.ImmutableList;
-import com.mapr.db.MapRDB;
-
-public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
-
- final private JsonTableGroupScan groupScan;
-
- final private LogicalExpression le;
-
- private boolean allExpressionsConverted = true;
-
- public JsonConditionBuilder(JsonTableGroupScan groupScan,
- LogicalExpression conditionExp) {
- this.groupScan = groupScan;
- this.le = conditionExp;
- }
-
- public JsonScanSpec parseTree() {
- JsonScanSpec parsedSpec = le.accept(this, null);
- if (parsedSpec != null) {
- parsedSpec.mergeScanSpec("booleanAnd", this.groupScan.getScanSpec());
- }
- return parsedSpec;
- }
-
- public boolean isAllExpressionsConverted() {
- // TODO Auto-generated method stub
- return allExpressionsConverted;
- }
-
- @Override
- public JsonScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
- allExpressionsConverted = false;
- return null;
- }
-
- @Override
- public JsonScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
- return visitFunctionCall(op, value);
- }
-
- @Override
- public JsonScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
- JsonScanSpec nodeScanSpec = null;
- String functionName = call.getName();
- ImmutableList<LogicalExpression> args = call.args;
-
- if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
- CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call);
- if (processor.isSuccess()) {
- nodeScanSpec = createJsonScanSpec(call, processor);
- }
- } else {
- switch(functionName) {
- case "booleanAnd":
- case "booleanOr":
- nodeScanSpec = args.get(0).accept(this, null);
- for (int i = 1; i < args.size(); ++i) {
- JsonScanSpec nextScanSpec = args.get(i).accept(this, null);
- if (nodeScanSpec != null && nextScanSpec != null) {
- nodeScanSpec.mergeScanSpec(functionName, nextScanSpec);
- } else {
- allExpressionsConverted = false;
- if ("booleanAnd".equals(functionName)) {
- nodeScanSpec = nodeScanSpec == null ? nextScanSpec : nodeScanSpec;
- }
- }
- }
- break;
- }
- }
-
- if (nodeScanSpec == null) {
- allExpressionsConverted = false;
- }
-
- return nodeScanSpec;
- }
-
- private void setIsCondition(QueryCondition c,
- String str,
- QueryCondition.Op op,
- Value v) {
- switch (v.getType()) {
- case BOOLEAN:
- c.is(str, op, v.getBoolean());
- break;
- case STRING:
- c.is(str, op, v.getString());
- break;
- case BYTE:
- c.is(str, op, v.getByte());
- break;
- case SHORT:
- c.is(str, op, v.getShort());
- break;
- case INT:
- c.is(str, op, v.getInt());
- break;
- case LONG:
- c.is(str, op, v.getLong());
- break;
- case FLOAT:
- c.is(str, op, v.getFloat());
- break;
- case DOUBLE:
- c.is(str, op, v.getDouble());
- break;
- case DECIMAL:
- c.is(str, op, v.getDecimal());
- break;
- case DATE:
- c.is(str, op, v.getDate());
- break;
- case TIME:
- c.is(str, op, v.getTime());
- break;
- case TIMESTAMP:
- c.is(str, op, v.getTimestamp());
- break;
- case BINARY:
- c.is(str, op, v.getBinary());
- break;
- // XXX/TODO: Map, Array?
- default:
- break;
- }
- }
-
- private JsonScanSpec createJsonScanSpec(FunctionCall call,
- CompareFunctionsProcessor processor) {
- String functionName = processor.getFunctionName();
- SchemaPath field = processor.getPath();
- Value fieldValue = processor.getValue();
-
- QueryCondition cond = null;
- switch (functionName) {
- case "equal":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.EQUAL, fieldValue);
- cond.build();
- break;
-
- case "not_equal":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.NOT_EQUAL, fieldValue);
- cond.build();
- break;
-
- case "less_than":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.LESS, fieldValue);
- cond.build();
- break;
-
- case "less_than_or_equal_to":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.LESS_OR_EQUAL, fieldValue);
- cond.build();
- break;
-
- case "greater_than":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.GREATER, fieldValue);
- cond.build();
- break;
-
- case "greater_than_or_equal_to":
- cond = MapRDB.newCondition();
- setIsCondition(cond, field.getAsUnescapedPath(), Op.GREATER_OR_EQUAL, fieldValue);
- cond.build();
- break;
-
- case "isnull":
- cond = MapRDB.newCondition().notExists(field.getAsUnescapedPath()).build();
- break;
-
- case "isnotnull":
- cond = MapRDB.newCondition().exists(field.getAsUnescapedPath()).build();
- break;
-
- case "istrue":
- cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.EQUAL, true).build();
- break;
-
- case "isnotfalse":
- cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.NOT_EQUAL, false).build();
- break;
-
- case "isfalse":
- cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.EQUAL, false).build();
- break;
-
- case "isnottrue":
- cond = MapRDB.newCondition().is(field.getAsUnescapedPath(), Op.NOT_EQUAL, true).build();
- break;
-
- case "like":
- cond = MapRDB.newCondition().like(field.getAsUnescapedPath(), fieldValue.getString()).build();
- break;
-
- default:
- }
-
- if (cond != null) {
- return new JsonScanSpec(groupScan.getTableName(), cond);
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonScanSpec.java
deleted file mode 100644
index 7763273..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonScanSpec.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.json;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.ojai.store.QueryCondition;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.db.MapRDB;
-import com.mapr.db.impl.ConditionImpl;
-
-public class JsonScanSpec {
- protected String tableName;
- protected QueryCondition condition;
-
- @JsonCreator
- public JsonScanSpec(@JsonProperty("tableName") String tableName,
- @JsonProperty("condition") QueryCondition condition) {
- this.tableName = tableName;
- this.condition = condition;
- }
-
- public String getTableName() {
- return this.tableName;
- }
-
- public byte[] getStartRow() {
- if (condition == null) {
- return HConstants.EMPTY_START_ROW;
- }
- return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStartRow();
- }
-
- public byte[] getStopRow() {
- if (condition == null) {
- return HConstants.EMPTY_END_ROW;
- }
-
- return ((ConditionImpl)this.condition).getRowkeyRanges().get(0).getStopRow();
- }
-
- public Object getSerializedFilter() {
- if (this.condition != null) {
- return ((ConditionImpl)this.condition).getDescriptor().getSerialized();
- }
-
- return null;
- }
-
- public void setCondition(QueryCondition condition) {
- this.condition = condition;
- }
-
- @JsonIgnore
- public QueryCondition getCondition() {
- return this.condition;
- }
-
- public void mergeScanSpec(String functionName, JsonScanSpec scanSpec) {
-
- if (this.condition != null && scanSpec.getCondition() != null) {
- QueryCondition newCond = MapRDB.newCondition();
- switch (functionName) {
- case "booleanAnd":
- newCond.and();
- break;
- case "booleanOr":
- newCond.or();
- break;
- default:
- assert(false);
- }
-
- newCond.condition(this.condition)
- .condition(scanSpec.getCondition())
- .close()
- .build();
-
- this.condition = newCond;
- } else if (scanSpec.getCondition() != null){
- this.condition = scanSpec.getCondition();
- }
- }
-
- @Override
- public String toString() {
- return "JsonScanSpec [tableName=" + tableName
- + ", condition=" + (condition == null ? null : condition.toString())
- + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonSubScanSpec.java
deleted file mode 100644
index aa5375a..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonSubScanSpec.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.json;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
-import org.apache.hadoop.hbase.HConstants;
-import org.ojai.DocumentConstants;
-import org.ojai.Value;
-import org.ojai.store.QueryCondition;
-import org.ojai.store.QueryCondition.Op;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.db.MapRDB;
-import com.mapr.db.impl.ConditionImpl;
-import com.mapr.db.impl.IdCodec;
-
-public class JsonSubScanSpec extends MapRDBSubScanSpec {
-
- protected QueryCondition condition;
-
- @JsonCreator
- public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
- @JsonProperty("regionServer") String regionServer,
- @JsonProperty("startRow") byte[] startRow,
- @JsonProperty("stopRow") byte[] stopRow,
- @JsonProperty("cond") QueryCondition cond) {
- super(tableName, regionServer, null, null, null, null);
-
- this.condition = MapRDB.newCondition().and();
-
- if (cond != null) {
- this.condition.condition(cond);
- }
-
- if (startRow != null &&
- Arrays.equals(startRow, HConstants.EMPTY_START_ROW) == false) {
- Value startVal = IdCodec.decode(startRow);
-
- switch(startVal.getType()) {
- case BINARY:
- this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getBinary());
- break;
- case STRING:
- this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getString());
- break;
- default:
- throw new IllegalStateException("Encountered an unsupported type " + startVal.getType()
- + " for _id");
- }
- }
-
- if (stopRow != null &&
- Arrays.equals(stopRow, HConstants.EMPTY_END_ROW) == false) {
- Value stopVal = IdCodec.decode(stopRow);
-
- switch(stopVal.getType()) {
- case BINARY:
- this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getBinary());
- break;
- case STRING:
- this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getString());
- break;
- default:
- throw new IllegalStateException("Encountered an unsupported type " + stopVal.getType()
- + " for _id");
- }
- }
-
- this.condition.close().build();
- }
-
- public void setCondition(QueryCondition cond) {
- condition = cond;
- }
-
- @JsonIgnore
- public QueryCondition getCondition() {
- return this.condition;
- }
-
- @Override
- public byte[] getSerializedFilter() {
- if (this.condition != null) {
- ByteBuffer bbuf = ((ConditionImpl)this.condition).getDescriptor().getSerialized();
- byte[] serFilter = new byte[bbuf.limit() - bbuf.position()];
- bbuf.get(serFilter);
- return serFilter;
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
deleted file mode 100644
index 6d896aa..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb.json;
-
-import static org.apache.drill.exec.store.maprdb.util.CommonFns.isNullOrEmpty;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin;
-import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig;
-import org.apache.drill.exec.store.maprdb.MapRDBGroupScan;
-import org.apache.drill.exec.store.maprdb.MapRDBSubScan;
-import org.apache.drill.exec.store.maprdb.MapRDBTableStats;
-import org.apache.drill.exec.store.maprdb.TabletFragmentInfo;
-import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.annotate.JsonCreator;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.mapr.db.MapRDB;
-import com.mapr.db.Table;
-import com.mapr.db.TabletInfo;
-import com.mapr.db.impl.TabletInfoImpl;
-
-@JsonTypeName("maprdb-json-scan")
-public class JsonTableGroupScan extends MapRDBGroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableGroupScan.class);
-
- public static final String TABLE_JSON = "json";
-
- private MapRDBTableStats tableStats;
-
- private JsonScanSpec scanSpec;
-
- @JsonCreator
- public JsonTableGroupScan(@JsonProperty("userName") final String userName,
- @JsonProperty("scanSpec") JsonScanSpec scanSpec,
- @JsonProperty("storage") FileSystemConfig storagePluginConfig,
- @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName,
- (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
- scanSpec, columns);
- }
-
- public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
- MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) {
- super(storagePlugin, formatPlugin, columns, userName);
- this.scanSpec = scanSpec;
- init();
- }
-
- /**
- * Private constructor, used for cloning.
- * @param that The HBaseGroupScan to clone
- */
- private JsonTableGroupScan(JsonTableGroupScan that) {
- super(that);
- this.scanSpec = that.scanSpec;
- this.endpointFragmentMapping = that.endpointFragmentMapping;
- this.tableStats = that.tableStats;
- }
-
- @Override
- public GroupScan clone(List<SchemaPath> columns) {
- JsonTableGroupScan newScan = new JsonTableGroupScan(this);
- newScan.columns = columns;
- return newScan;
- }
-
- private void init() {
- logger.debug("Getting tablet locations");
- try {
- Configuration conf = new Configuration();
- Table t = MapRDB.getTable(scanSpec.getTableName());
- TabletInfo[] tabletInfos = t.getTabletInfos(scanSpec.getCondition());
- tableStats = new MapRDBTableStats(conf, scanSpec.getTableName());
-
- boolean foundStartRegion = false;
- regionsToScan = new TreeMap<TabletFragmentInfo, String>();
- for (TabletInfo tabletInfo : tabletInfos) {
- TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
- if (!foundStartRegion
- && !isNullOrEmpty(scanSpec.getStartRow())
- && !tabletInfoImpl.containsRow(scanSpec.getStartRow())) {
- continue;
- }
- foundStartRegion = true;
- regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), tabletInfo.getLocations()[0]);
- if (!isNullOrEmpty(scanSpec.getStopRow())
- && tabletInfoImpl.containsRow(scanSpec.getStopRow())) {
- break;
- }
- }
- } catch (Exception e) {
- throw new DrillRuntimeException("Error getting region info for table: " + scanSpec.getTableName(), e);
- }
- }
-
- protected JsonSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
- // XXX/TODO check filter/Condition
- JsonScanSpec spec = scanSpec;
- JsonSubScanSpec subScanSpec = new JsonSubScanSpec(
- spec.getTableName(),
- regionsToScan.get(tfi),
- (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
- (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
- spec.getCondition());
- return subScanSpec;
- }
-
- @Override
- public MapRDBSubScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < endpointFragmentMapping.size() : String.format(
- "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
- minorFragmentId);
- return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
- endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
- }
-
- @Override
- public ScanStats getScanStats() {
- //TODO: look at stats for this.
- long rowCount = (long) ((scanSpec.getSerializedFilter() != null ? .5 : 1) * tableStats.getNumRows());
- int avgColumnSize = 10;
- int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new JsonTableGroupScan(this);
- }
-
- @JsonIgnore
- public String getTableName() {
- return scanSpec.getTableName();
- }
-
- @Override
- public String toString() {
- return "JsonTableGroupScan [ScanSpec="
- + scanSpec + ", columns="
- + columns + "]";
- }
-
- public JsonScanSpec getScanSpec() {
- return scanSpec;
- }
-}