You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/04 18:55:24 UTC
[02/21] git commit: DRILL-571: Predicate push down into HBase scan
DRILL-571: Predicate push down into HBase scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/69d90bbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/69d90bbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/69d90bbc
Branch: refs/heads/master
Commit: 69d90bbcd7e274d68bb41c4922e48dd51f44ef2b
Parents: bcc3c73
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Apr 27 22:26:24 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 3 17:54:33 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/DrillHBaseConstants.java | 27 ++++
.../exec/store/hbase/HBaseFilterBuilder.java | 132 +++++++++++++++++++
.../drill/exec/store/hbase/HBaseGroupScan.java | 126 ++++++++++++------
.../store/hbase/HBasePushFilterIntoScan.java | 65 +++++++++
.../exec/store/hbase/HBaseRecordReader.java | 14 +-
.../exec/store/hbase/HBaseScanBatchCreator.java | 4 +-
.../drill/exec/store/hbase/HBaseScanSpec.java | 96 ++++++++++++++
.../exec/store/hbase/HBaseSchemaFactory.java | 4 +-
.../exec/store/hbase/HBaseStoragePlugin.java | 12 +-
.../store/hbase/HBaseStoragePluginConfig.java | 45 +++----
.../drill/exec/store/hbase/HBaseSubScan.java | 99 ++++++++++----
.../drill/exec/store/hbase/HBaseUtils.java | 69 ++++++++++
.../drill/exec/store/hbase/HTableReadEntry.java | 36 -----
.../drill/hbase/HBaseRecordReaderTest.java | 28 +---
.../org/apache/drill/hbase/HBaseTestsSuite.java | 9 +-
.../drill/hbase/TestHBaseFilterPushDown.java | 106 +++++++++++++++
.../apache/drill/hbase/TestTableGenerator.java | 11 +-
.../hbase/hbase_scan_screen_physical.json | 5 +-
...base_scan_screen_physical_column_select.json | 5 +-
...base_scan_screen_physical_family_select.json | 5 +-
.../src/test/resources/storage-plugins.json | 40 ++++++
distribution/src/resources/storage-plugins.json | 8 ++
22 files changed, 765 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
new file mode 100644
index 0000000..7969c45
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import org.apache.drill.common.expression.SchemaPath;
+
+public interface DrillHBaseConstants {
+ static final String ROW_KEY = "row_key";
+
+ static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
new file mode 100644
index 0000000..b39ee1d
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
+public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
+
+ private static final ImmutableMap<String, String> RELATIONAL_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ Builder<String, String> builder = ImmutableMap.builder();
+ RELATIONAL_FUNCTIONS_TRANSPOSE_MAP = builder
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+ private HBaseScanSpec scanSpec;
+
+ HBaseFilterBuilder(HBaseScanSpec hbaseScanSpec) {
+ this.scanSpec = hbaseScanSpec;
+ }
+
+ static HBaseScanSpec getHBaseScanSpec(HBaseScanSpec hbaseScanSpec, LogicalExpression e) {
+ HBaseFilterBuilder filterBuilder = new HBaseFilterBuilder(hbaseScanSpec);
+ return e.accept(filterBuilder, null);
+ }
+
+ @Override
+ public HBaseScanSpec visitFunctionCall(FunctionCall call, Void filterSet) throws RuntimeException {
+ String functionName = call.getName();
+ ImmutableList<LogicalExpression> args = call.args;
+ if (args.size() == 2 && RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
+ LogicalExpression nameArg = args.get(0);
+ LogicalExpression valueArg = args.get(1);
+ if (nameArg instanceof QuotedString) {
+ valueArg = nameArg;
+ nameArg = args.get(1);
+ functionName = RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+
+ while (nameArg instanceof CastExpression
+ && nameArg.getMajorType().getMinorType() == MinorType.VARCHAR) {
+ nameArg = ((CastExpression) nameArg).getInput();
+ }
+
+ if (nameArg instanceof FieldReference
+ && ((FieldReference) nameArg).getAsUnescapedPath().equals(ROW_KEY)
+ && valueArg instanceof QuotedString) {
+ return createHBaseScanSpec(functionName , ((QuotedString) valueArg).value.getBytes());
+ }
+ }
+ return null;
+ }
+
+ private HBaseScanSpec createHBaseScanSpec(String functionName, byte[] value) {
+ byte[] startRow = scanSpec.getStartRow();
+ byte[] stopRow = scanSpec.getStopRow();
+ Filter filter = null;
+ switch (functionName) {
+ case "equal":
+ startRow = stopRow = value;
+ break;
+ case "not_equal":
+ filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(value));
+ break;
+ case "greater_than_or_equal_to":
+ startRow = value;
+ filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(value));
+ break;
+ case "greater_than":
+ startRow = value;
+ filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(value));
+ break;
+ case "less_than_or_equal_to":
+ stopRow = Arrays.copyOf(value, value.length+1); // stopRow should be just greater than 'value'
+ filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(value));
+ break;
+ case "less_than":
+ stopRow = value;
+ filter = new RowFilter(CompareOp.LESS, new BinaryComparator(value));
+ break;
+ default:
+ break;
+ }
+ if (filter != null || startRow != scanSpec.getStartRow() || stopRow != scanSpec.getStopRow()) {
+ return new HBaseScanSpec(scanSpec.getTableName(), startRow, stopRow, filter);
+ }
+ return null;
+ }
+
+ @Override
+ public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index f309b3b..311e579 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -18,12 +18,16 @@
package org.apache.drill.exec.store.hbase;
import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -47,75 +51,98 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
@JsonTypeName("hbase-scan")
-public class HBaseGroupScan extends AbstractGroupScan {
+public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
- private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanReadEntry> mappings;
- private Stopwatch watch = new Stopwatch();
-
+ private HBaseStoragePluginConfig storagePluginConfig;
@JsonProperty("storage")
public HBaseStoragePluginConfig getStorageConfig() {
return this.storagePluginConfig;
}
+ private List<SchemaPath> columns;
@JsonProperty
public List<SchemaPath> getColumns() {
return columns;
}
- private String tableName;
+ private HBaseScanSpec hbaseScanSpec;
+ @JsonProperty
+ public HBaseScanSpec getHBaseScanSpec() {
+ return hbaseScanSpec;
+ }
+
+ @JsonIgnore
+ public HBaseStoragePlugin getStoragePlugin() {
+ return storagePlugin;
+ }
+
+ private Stopwatch watch = new Stopwatch();
+ private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings;
private HBaseStoragePlugin storagePlugin;
- private HBaseStoragePluginConfig storagePluginConfig;
private List<EndpointAffinity> endpointAffinities;
- private List<SchemaPath> columns;
-
- private NavigableMap<HRegionInfo,ServerName> regionsMap;
+ private NavigableMap<HRegionInfo,ServerName> regionsToScan;
@JsonCreator
- public HBaseGroupScan(@JsonProperty("entries") List<HTableReadEntry> entries,
- @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry
- )throws IOException, ExecutionSetupException {
- Preconditions.checkArgument(entries.size() == 1);
+ public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
+ @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
this.storagePlugin = (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig);
this.storagePluginConfig = storagePluginConfig;
- this.tableName = entries.get(0).getTableName();
+ this.hbaseScanSpec = hbaseScanSpec;
this.columns = columns;
getRegionInfos();
}
- public HBaseGroupScan(String tableName, HBaseStoragePlugin storageEngine, List<SchemaPath> columns) throws IOException {
+ public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
this.storagePlugin = storageEngine;
this.storagePluginConfig = storageEngine.getConfig();
- this.tableName = tableName;
+ this.hbaseScanSpec = scanSpec;
this.columns = columns;
getRegionInfos();
}
- protected void getRegionInfos() throws IOException {
+ private void getRegionInfos() {
logger.debug("Getting region locations");
- HTable table = new HTable(storagePluginConfig.conf, tableName);
- regionsMap = table.getRegionLocations();
- regionsMap.values().iterator().next().getHostname();
- table.close();
+ try {
+ HTable table = new HTable(storagePluginConfig.getHBaseConf(), hbaseScanSpec.getTableName());
+ NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
+ table.close();
+
+ boolean foundStartRegion = false;
+ regionsToScan = new TreeMap<HRegionInfo, ServerName>();
+ for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
+ HRegionInfo regionInfo = mapEntry.getKey();
+ if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
+ continue;
+ }
+ foundStartRegion = true;
+ regionsToScan.put(regionInfo, mapEntry.getValue());
+ if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
+ }
}
@Override
public List<EndpointAffinity> getOperatorAffinity() {
watch.reset();
watch.start();
- Map<String, DrillbitEndpoint> endpointMap = new HashMap();
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) {
endpointMap.put(ep.getAddress(), ep);
}
- Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap();
-
- for (ServerName sn : regionsMap.values()) {
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+ for (ServerName sn : regionsToScan.values()) {
String host = sn.getHostname();
DrillbitEndpoint ep = endpointMap.get(host);
if (ep != null) {
@@ -140,24 +167,28 @@ public class HBaseGroupScan extends AbstractGroupScan {
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
watch.reset();
watch.start();
- Preconditions.checkArgument(incomingEndpoints.size() <= regionsMap.size(),
- String.format("Incoming endpoints %d is greater than number of row groups %d", incomingEndpoints.size(), regionsMap.size()));
+ Preconditions.checkArgument(incomingEndpoints.size() <= regionsToScan.size(),
+ String.format("Incoming endpoints %d is greater than number of row groups %d", incomingEndpoints.size(), regionsToScan.size()));
+
mappings = ArrayListMultimap.create();
ArrayListMultimap<String, Integer> incomingEndpointMap = ArrayListMultimap.create();
for (int i = 0; i < incomingEndpoints.size(); i++) {
incomingEndpointMap.put(incomingEndpoints.get(i).getAddress(), i);
}
- Map<String, Iterator<Integer>> mapIterator = new HashMap();
+ Map<String, Iterator<Integer>> mapIterator = new HashMap<String, Iterator<Integer>>();
for (String s : incomingEndpointMap.keySet()) {
Iterator<Integer> ints = Iterators.cycle(incomingEndpointMap.get(s));
mapIterator.put(s, ints);
}
Iterator<Integer> nullIterator = Iterators.cycle(incomingEndpointMap.values());
- for (HRegionInfo regionInfo : regionsMap.keySet()) {
+ for (HRegionInfo regionInfo : regionsToScan.keySet()) {
logger.debug("creating read entry. start key: {} end key: {}", Bytes.toStringBinary(regionInfo.getStartKey()), Bytes.toStringBinary(regionInfo.getEndKey()));
- HBaseSubScan.HBaseSubScanReadEntry p = new HBaseSubScan.HBaseSubScanReadEntry(
- tableName, Bytes.toStringBinary(regionInfo.getStartKey()), Bytes.toStringBinary(regionInfo.getEndKey()));
- String host = regionsMap.get(regionInfo).getHostname();
+ HBaseSubScan.HBaseSubScanSpec p = new HBaseSubScan.HBaseSubScanSpec()
+ .setTableName(hbaseScanSpec.getTableName())
+ .setStartRow((hbaseScanSpec.getStartRow() != null && regionInfo.containsRow(hbaseScanSpec.getStartRow())) ? hbaseScanSpec.getStartRow() : regionInfo.getStartKey())
+ .setStopRow((hbaseScanSpec.getStopRow() != null && regionInfo.containsRow(hbaseScanSpec.getStopRow())) ? hbaseScanSpec.getStopRow() : regionInfo.getEndKey())
+ .setSerializedFilter(hbaseScanSpec.getSerializedFilter());
+ String host = regionsToScan.get(regionInfo).getHostname();
Iterator<Integer> indexIterator = mapIterator.get(host);
if (indexIterator == null) {
indexIterator = nullIterator;
@@ -174,19 +205,23 @@ public class HBaseGroupScan extends AbstractGroupScan {
@Override
public int getMaxParallelizationWidth() {
- return regionsMap.size();
+ return regionsToScan.size();
}
@Override
public OperatorCost getCost() {
//TODO Figure out how to properly calculate cost
- return new OperatorCost(1,1,1,1);
+ return new OperatorCost(regionsToScan.size(), // network
+ 1, // disk
+ 1, // memory
+ 1); // cpu
}
@Override
public Size getSize() {
// TODO - this is wrong, need to populate correctly
- return new Size(10,10);
+ int size = (hbaseScanSpec.getFilter() != null ? 5 : 10) * regionsToScan.size();
+ return new Size(size, size);
}
@Override
@@ -202,4 +237,11 @@ public class HBaseGroupScan extends AbstractGroupScan {
return toString();
}
+ @Override
+ public String toString() {
+ return "HBaseGroupScan [HBaseScanSpec="
+ + hbaseScanSpec + ", columns="
+ + columns + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
new file mode 100644
index 0000000..137f6fe
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hbase;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexNode;
+
+public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
+ public static final StoragePluginOptimizerRule INSTANCE = new HBasePushFilterIntoScan();
+
+ private HBasePushFilterIntoScan() {
+ super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "HBasePushFilterIntoScan");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ final FilterPrel filter = (FilterPrel) call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ HBaseGroupScan groupScan = (HBaseGroupScan)scan.getGroupScan();
+ LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(), scan, condition);
+ HBaseScanSpec newScanSpec = HBaseFilterBuilder.getHBaseScanSpec(groupScan.getHBaseScanSpec(), conditionExp);
+ if (newScanSpec == null) {
+ return; //no filter pushdown ==> No transformation.
+ }
+ final GroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan);
+ call.transformTo(newScanPrel);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof HBaseGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 8e1e0ac..3694b53 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
@@ -56,12 +55,10 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
-public class HBaseRecordReader implements RecordReader {
+public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
- private static final String ROW_KEY = "row_key";
private static final int TARGET_RECORD_COUNT = 4000;
- private static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
private List<SchemaPath> columns;
private OutputMutator outputMutator;
@@ -74,9 +71,10 @@ public class HBaseRecordReader implements RecordReader {
private SchemaPath rowKeySchemaPath;
private HTable table;
- public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanReadEntry e, List<SchemaPath> columns, FragmentContext context) {
+ public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec e, List<SchemaPath> columns, FragmentContext context) {
this.columns = columns;
- this.scan = new Scan(Bytes.toBytesBinary(e.getStartRow()), Bytes.toBytesBinary(e.getEndRow()));
+ this.scan = new Scan(e.getStartRow(), e.getStopRow());
+ this.scan.setFilter(e.getScanFilter());
this.context = context;
if (columns != null && columns.size() != 0) {
for (SchemaPath column : columns) {
@@ -116,10 +114,11 @@ public class HBaseRecordReader implements RecordReader {
}
@Override
+ @SuppressWarnings("deprecation")
public void setup(OutputMutator output) throws ExecutionSetupException {
this.outputMutator = output;
output.removeAllFields();
- vvMap = new HashMap();
+ vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
// Add Vectors to output in the order specified when creating reader
for (SchemaPath column : columns) {
@@ -211,6 +210,7 @@ public class HBaseRecordReader implements RecordReader {
return TARGET_RECORD_COUNT;
}
+ @SuppressWarnings("deprecation")
private NullableVarBinaryVector addNewVector(String column) {
MaterializedField field = MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), Types.optional(TypeProtos.MinorType.VARBINARY));
NullableVarBinaryVector v = new NullableVarBinaryVector(field, context.getAllocator());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 157d84a..7e38f5f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -37,8 +37,8 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
- Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).conf;
- for(HBaseSubScan.HBaseSubScanReadEntry e : subScan.getRowGroupReadEntries()){
+ Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).getHBaseConf();
+ for(HBaseSubScan.HBaseSubScanSpec e : subScan.getRegionScanSpecList()){
try {
readers.add(
new HBaseRecordReader(config, e, subScan.getColumns(), context)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
new file mode 100644
index 0000000..c2ee723
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class HBaseScanSpec {
+
+ protected String tableName;
+ protected byte[] startRow;
+ protected byte[] stopRow;
+
+ protected Filter filter;
+
+ @JsonCreator
+ public HBaseScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("startRow") byte[] startRow,
+ @JsonProperty("stopRow") byte[] stopRow,
+ @JsonProperty("serializedFilter") byte[] serializedFilter,
+ @JsonProperty("filterString") String filterString) {
+ if (serializedFilter != null && filterString != null) {
+ throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
+ }
+ this.tableName = tableName;
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ if (filterString != null) {
+ this.filter = HBaseUtils.parseFilterString(filterString);
+ } else {
+ this.filter = HBaseUtils.deserializeFilter(serializedFilter);
+ }
+ }
+
+ public HBaseScanSpec(String tableName, byte[] startRow, byte[] stopRow, Filter filter) {
+ this.tableName = tableName;
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ this.filter = filter;
+ }
+
+ public HBaseScanSpec(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ @JsonIgnore
+ public Filter getFilter() {
+ return this.filter;
+ }
+
+ public byte[] getSerializedFilter() {
+ return (this.filter != null) ? HBaseUtils.serializeFilter(this.filter) : null;
+ }
+
+ @Override
+ public String toString() {
+ return "HBaseScanSpec [tableName=" + tableName
+ + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
+ + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
+ + ", filter=" + (filter == null ? null : filter.toString())
+ + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 991685c..7f67f83 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -73,13 +73,13 @@ public class HBaseSchemaFactory implements SchemaFactory {
@Override
public DrillTable getTable(String name) {
- Object selection = new HTableReadEntry(name);
+ Object selection = new HBaseScanSpec(name);
return new DynamicDrillTable(plugin, schemaName, selection, plugin.getConfig());
}
@Override
public Set<String> getTableNames() {
- try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().conf)) {
+ try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().getHBaseConf())) {
HTableDescriptor[] tables = admin.listTables();
Set<String> tableNames = Sets.newHashSet();
for (HTableDescriptor table : tables) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index a82c6c3..0ecc379 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.hbase;
import java.io.IOException;
+import java.util.Set;
import net.hydromatic.optiq.SchemaPlus;
@@ -25,9 +26,11 @@ import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
public class HBaseStoragePlugin extends AbstractStoragePlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePlugin.class);
@@ -56,9 +59,8 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
@Override
public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
- HTableReadEntry readEntry = selection.getListWith(new ObjectMapper(),
- new TypeReference<HTableReadEntry>() {});
- return new HBaseGroupScan(readEntry.getTableName(), this, null);
+ HBaseScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<HBaseScanSpec>() {});
+ return new HBaseGroupScan(this, scanSpec, null);
}
@Override
@@ -71,4 +73,8 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
return engineConfig;
}
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of(HBasePushFilterIntoScan.INSTANCE);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index a5dfc35..7eba917 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -20,56 +20,57 @@ package org.apache.drill.exec.store.hbase;
import org.apache.drill.common.logical.StoragePluginConfigBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.annotation.JsonIgnore;
@JsonTypeName("hbase")
public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
- @JsonIgnore
- public Configuration conf;
-
@JsonProperty
public String zookeeperQuorum;
+
@JsonProperty
public int zookeeperPort;
+ private Configuration hbaseConf;
+ private HConnectionKey hbaseConfKey;
+
@JsonCreator
public HBaseStoragePluginConfig(@JsonProperty("zookeeperQuorum") String zookeeperQuorum,
@JsonProperty("zookeeperPort") int zookeeperPort) {
this.zookeeperQuorum = zookeeperQuorum;
this.zookeeperPort = zookeeperPort;
- conf = HBaseConfiguration.create();
+
+ this.hbaseConf = HBaseConfiguration.create();
if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
- conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
- conf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.set("hbase.zookeeper.quorum", zookeeperQuorum);
+ hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
}
+ this.hbaseConfKey = new HConnectionKey(hbaseConf);
}
- /*
- @JsonIgnore
- public Configuration getConf() {
- return conf;
- }
- */
-
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
HBaseStoragePluginConfig that = (HBaseStoragePluginConfig) o;
-
- if (conf != null ? !conf.equals(that.conf) : that.conf != null) return false;
-
- return true;
+ return this.hbaseConfKey.equals(that.hbaseConfKey);
}
@Override
public int hashCode() {
- return conf != null ? conf.hashCode() : 0;
+ return this.hbaseConfKey != null ? this.hbaseConfKey.hashCode() : 0;
+ }
+
+ @JsonIgnore
+ public Configuration getHBaseConf() {
+ return hbaseConf;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 81a8af5..ceaf23f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.OperatorCost;
@@ -31,6 +30,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.hadoop.hbase.filter.Filter;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -39,10 +40,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-// Class containing information for reading a single HBase row group form HDFS
-@JsonTypeName("hbase-row-group-scan")
+// Class containing information for reading a single HBase region
+@JsonTypeName("hbase-region-scan")
public class HBaseSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
@@ -50,30 +50,30 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
public final StoragePluginConfig storage;
@JsonIgnore
private final HBaseStoragePlugin hbaseStoragePlugin;
- private final List<HBaseSubScanReadEntry> rowGroupReadEntries;
+ private final List<HBaseSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
@JsonCreator
- public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("storage") StoragePluginConfig storage,
- @JsonProperty("rowGroupReadEntries") LinkedList<HBaseSubScanReadEntry> rowGroupReadEntries,
+ public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("storage") StoragePluginConfig storage,
+ @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
- this.rowGroupReadEntries = rowGroupReadEntries;
+ this.regionScanSpecList = regionScanSpecList;
this.storage = storage;
this.columns = columns;
}
public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
- List<HBaseSubScanReadEntry> regionInfoList,
- List<SchemaPath> columns) {
+ List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
hbaseStoragePlugin = plugin;
storage = config;
- this.rowGroupReadEntries = regionInfoList;
+ this.regionScanSpecList = regionInfoList;
this.columns = columns;
}
- public List<HBaseSubScanReadEntry> getRowGroupReadEntries() {
- return rowGroupReadEntries;
+ public List<HBaseSubScanSpec> getRegionScanSpecList() {
+ return regionScanSpecList;
}
@JsonIgnore
@@ -113,7 +113,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, rowGroupReadEntries, columns);
+ return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, regionScanSpecList, columns);
}
@Override
@@ -121,31 +121,82 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
return Iterators.emptyIterator();
}
- public static class HBaseSubScanReadEntry {
+ public static class HBaseSubScanSpec {
- private String tableName;
- private String startRow;
- private String endRow;
+ protected String tableName;
+ protected byte[] startRow;
+ protected byte[] stopRow;
+ protected byte[] serializedFilter;
@parquet.org.codehaus.jackson.annotate.JsonCreator
- public HBaseSubScanReadEntry(@JsonProperty("tableName") String tableName,
- @JsonProperty("startRow") String startRow, @JsonProperty("endRow") String endRow) {
+ public HBaseSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("startRow") byte[] startRow,
+ @JsonProperty("stopRow") byte[] stopRow,
+ @JsonProperty("serializedFilter") byte[] serializedFilter,
+ @JsonProperty("filterString") String filterString) {
+ if (serializedFilter != null && filterString != null) {
+ throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
+ }
this.tableName = tableName;
this.startRow = startRow;
- this.endRow = endRow;
+ this.stopRow = stopRow;
+ if (serializedFilter != null) {
+ this.serializedFilter = serializedFilter;
+ } else {
+ this.serializedFilter = HBaseUtils.serializeFilter(HBaseUtils.parseFilterString(filterString));
+ }
+ }
+
+ /* package */ HBaseSubScanSpec() {
+ // empty constructor, to be used with builder pattern;
+ }
+
+ @JsonIgnore
+ private Filter scanFilter;
+ public Filter getScanFilter() {
+ if (scanFilter == null && serializedFilter != null) {
+ scanFilter = HBaseUtils.deserializeFilter(serializedFilter);
+ }
+ return scanFilter;
}
public String getTableName() {
return tableName;
}
- public String getStartRow() {
+ public HBaseSubScanSpec setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public byte[] getStartRow() {
return startRow;
}
- public String getEndRow() {
- return endRow;
+ public HBaseSubScanSpec setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ return this;
}
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ public HBaseSubScanSpec setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ return this;
+ }
+
+ public byte[] getSerializedFilter() {
+ return serializedFilter;
+ }
+
+ public HBaseSubScanSpec setSerializedFilter(byte[] serializedFilter) {
+ this.serializedFilter = serializedFilter;
+ this.scanFilter = null;
+ return this;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
new file mode 100644
index 0000000..bebdda7
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBaseUtils {
+ static final ParseFilter FILTER_PARSEER = new ParseFilter();
+
+ public static byte[] getBytes(String str) {
+ return str == null ? HConstants.EMPTY_BYTE_ARRAY : Bytes.toBytes(str);
+ }
+
+ static Filter parseFilterString(String filterString) {
+ if (filterString == null) return null;
+ try {
+ return FILTER_PARSEER.parseFilterString(filterString);
+ } catch (CharacterCodingException e) {
+ throw new DrillRuntimeException("Error parsing filter string: " + filterString, e);
+ }
+ }
+
+ public static byte[] serializeFilter(Filter filter) {
+ if (filter == null) return null;
+ try(ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(byteStream)) {
+ HbaseObjectWritable.writeObject(out, filter, filter.getClass(), null);
+ return byteStream.toByteArray();
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Error serializing filter: " + filter, e);
+ }
+ }
+
+ public static Filter deserializeFilter(byte[] filterBytes) {
+ if (filterBytes == null) return null;
+ try(DataInputStream dis = new DataInputStream(new ByteArrayInputStream(filterBytes));) {
+ return (Filter) HbaseObjectWritable.readObject(dis, null);
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error deserializing filter: " + filterBytes, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HTableReadEntry.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HTableReadEntry.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HTableReadEntry.java
deleted file mode 100644
index e18b28d..0000000
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HTableReadEntry.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hbase;
-
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class HTableReadEntry {
-
- protected String tableName;
-
- public String getTableName() {
- return tableName;
- }
-
- @JsonCreator
- public HTableReadEntry(@JsonProperty("tableName") String tableName) {
- this.tableName = tableName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
index 8a476c3..e76d867 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
@@ -17,47 +17,23 @@
*/
package org.apache.drill.hbase;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import mockit.Injectable;
-import mockit.NonStrictExpectations;
+import java.util.List;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
-import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.pop.PopUnitTestBase;
-import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.UserProtos;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.hbase.HBaseRecordReader;
-import org.apache.drill.exec.store.hbase.HBaseSubScan;
import org.apache.drill.exec.util.VectorUtil;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.List;
-
public class HBaseRecordReaderTest extends PopUnitTestBase {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(HBaseRecordReaderTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index df6a98e..3039d39 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -18,6 +18,7 @@
package org.apache.drill.hbase;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -25,10 +26,7 @@ import org.apache.drill.common.util.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapred.TestTableInputFormat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -39,11 +37,11 @@ import org.junit.runners.Suite.SuiteClasses;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-
@RunWith(Suite.class)
@SuiteClasses({HBaseRecordReaderTest.class})
public class HBaseTestsSuite {
private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
+ private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
private static Configuration conf;
@@ -55,6 +53,9 @@ public class HBaseTestsSuite {
conf = HBaseConfiguration.create();
}
conf.set("hbase.zookeeper.property.clientPort", "2181");
+ if (IS_DEBUG) {
+ conf.set("hbase.regionserver.lease.period","1000000");
+ }
LOG.info("Starting HBase mini cluster.");
if (UTIL == null) {
UTIL = new HBaseTestingUtility(conf);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
new file mode 100644
index 0000000..1911078
--- /dev/null
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import java.util.List;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.util.VectorUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore // Need to find a way to pass zookeeper port to HBase storage plugin configuration before enabling this test
+public class TestHBaseFilterPushDown extends BaseTestQuery {
+ private static final String TABLE_NAME = "TestTable1";
+
+ private static HBaseAdmin admin;
+ private static Configuration conf = HBaseConfiguration.create();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf.set("hbase.zookeeper.property.clientPort", "2181");
+ admin = new HBaseAdmin(conf);
+ TestTableGenerator.generateHBaseTable(admin, TABLE_NAME, 2, 1000);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ System.out.println("HBaseStorageHandlerTest: tearDownAfterClass()");
+ admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownRowKeyEqual() throws Exception{
+ verify("SELECT\n"
+ + " tableName.*\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + " WHERE tableName.row_key = 'b4'"
+ , 1);
+ }
+
+ @Test
+ public void testFilterPushDownRowKeyGreaterThan() throws Exception{
+ verify("SELECT\n"
+ + " tableName.*\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + " WHERE tableName.row_key > 'b4'"
+ , 2);
+ }
+
+ @Test
+ public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception{
+ verify("SELECT\n"
+ + " tableName.*\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + " WHERE 'b4' >= tableName.row_key"
+ , 4);
+ }
+
+ protected void verify(String sql, int expectedRowCount) throws Exception{
+ sql = sql.replace("[TABLE_NAME]", TABLE_NAME);
+ List<QueryResultBatch> results = testSqlWithResults(sql);
+
+ int rowCount = 0;
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ for(QueryResultBatch result : results){
+ rowCount += result.getHeader().getRowCount();
+ loader.load(result.getHeader().getDef(), result.getData());
+ if (loader.getRecordCount() <= 0) {
+ break;
+ }
+ VectorUtil.showVectorAccessibleContent(loader, 8);
+ loader.clear();
+ result.release();
+ }
+ System.out.println("Total record count: " + rowCount);
+ Assert.assertEquals(expectedRowCount, rowCount);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
index 2207f1d..76f100e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
@@ -19,7 +19,6 @@ package org.apache.drill.hbase;
import java.util.Arrays;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -36,15 +35,18 @@ public class TestTableGenerator {
public static void generateHBaseTable(HBaseAdmin admin, String tableName, int numberRegions,
int recordsPerRegion) throws Exception {
- Configuration conf = admin.getConfiguration();
-
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
byte[][] splitKeys = Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1);
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
desc.addFamily(new HColumnDescriptor("f2"));
admin.createTable(desc, splitKeys);
- HTable table = new HTable(HBaseTestsSuite.getConf(), tableName);
+ HTable table = new HTable(admin.getConfiguration(), tableName);
Put p = new Put("a1".getBytes());
p.add("f".getBytes(), "c1".getBytes(), "1".getBytes());
p.add("f".getBytes(), "c2".getBytes(), "2".getBytes());
@@ -94,6 +96,7 @@ public class TestTableGenerator {
p.add("f2".getBytes(), "c9".getBytes(), "6".getBytes());
table.put(p);
table.flushCommits();
+ table.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
index ae42d6b..60d314b 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
@@ -9,10 +9,9 @@
graph : [ {
pop : "hbase-scan",
@id : 1,
- entries : [
- {
+ hbaseScanSpec : {
tableName : "testTable"
- }],
+ },
storage:
{
"type":"hbase",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index 7940c65..13a2982 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -9,10 +9,9 @@
graph : [ {
pop : "hbase-scan",
@id : 1,
- entries : [
- {
+ hbaseScanSpec : {
tableName : "testTable"
- }],
+ },
storage:
{
"type":"hbase",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
index 2dcc81c..c68fef3 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
@@ -9,10 +9,9 @@
graph : [ {
pop : "hbase-scan",
@id : 1,
- entries : [
- {
+ hbaseScanSpec : {
tableName : "testTable"
- }],
+ },
storage:
{
"type":"hbase",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/contrib/storage-hbase/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/storage-plugins.json b/contrib/storage-hbase/src/test/resources/storage-plugins.json
new file mode 100644
index 0000000..160583a
--- /dev/null
+++ b/contrib/storage-hbase/src/test/resources/storage-plugins.json
@@ -0,0 +1,40 @@
+{
+ "storage":{
+ dfs: {
+ type: "file",
+ connection: "file:///",
+ formats: {
+ "psv" : {
+ type: "text",
+ extensions: [ "tbl" ],
+ delimiter: "|"
+ },
+ "csv" : {
+ type: "text",
+ extensions: [ "csv" ],
+ delimiter: ","
+ },
+ "tsv" : {
+ type: "text",
+ extensions: [ "tsv" ],
+ delimiter: "\t"
+ },
+ "parquet" : {
+ type: "parquet"
+ },
+ "json" : {
+ type: "json"
+ }
+ }
+ },
+ cp: {
+ type: "file",
+ connection: "classpath:///"
+ },
+ hbase : {
+ type:"hbase",
+ zookeeperQuorum : "localhost",
+ zookeeperPort : 2181
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69d90bbc/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
index 6f2c015..a2a9f8e 100644
--- a/distribution/src/resources/storage-plugins.json
+++ b/distribution/src/resources/storage-plugins.json
@@ -45,5 +45,13 @@
}
}
*/
+
+ /*,
+ hbase : {
+ type:"hbase",
+ zookeeperQuorum : "localhost",
+ zookeeperPort : 2181
+ }
+ */
}
}