You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:25 UTC
[02/11] git commit: DRILL-683: Qualify HBase scan with specified
columns even if row_key is required.
DRILL-683: Qualify HBase scan with specified columns even if row_key is required.
+ Added some log messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/49d53335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/49d53335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/49d53335
Branch: refs/heads/master
Commit: 49d533355e6cb98c623f68c3b90331c62ecbc270
Parents: e602b2a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 02:19:36 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:45:59 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/DrillHBaseConstants.java | 2 ++
.../exec/store/hbase/HBaseRecordReader.java | 21 ++++++--------------
.../exec/store/hbase/HBaseScanBatchCreator.java | 7 +++----
.../store/hbase/HBaseStoragePluginConfig.java | 12 +++++++----
.../drill/exec/store/hbase/HBaseSubScan.java | 8 ++++----
...base_scan_screen_physical_column_select.json | 2 +-
6 files changed, 24 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index 7969c45..a86797b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -24,4 +24,6 @@ public interface DrillHBaseConstants {
static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+ static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index af059f5..381cd6a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -43,6 +41,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
@@ -70,7 +68,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
private Result leftOver;
private VarBinaryVector rowKeyVector;
private SchemaPath rowKeySchemaPath;
- private HTable table;
public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
@@ -110,15 +107,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
try {
- if (rowKeySchemaPath != null) {
- /* if ROW_KEY was requested, we can not qualify the scan with columns,
- * otherwise HBase will omit the entire row of all of the specified columns do
- * not exist for that row. Eventually we may want to use Family and/or Qualifier
- * Filters in such case but that would mean additional processing at server.
- */
- scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR));
- }
-
Filter scanFilter = subScanSpec.getScanFilter();
if (rowKeyOnly) {
/* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
@@ -134,12 +122,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
scan.setFilter(scanFilter);
scan.setCaching(TARGET_RECORD_COUNT);
- table = new HTable(conf, subScanSpec.getTableName());
+ logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
+ subScanSpec.getTableName(), conf.get(HConstants.ZOOKEEPER_QUORUM),
+ conf.get(HBASE_ZOOKEEPER_PORT), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ HTable table = new HTable(conf, subScanSpec.getTableName());
resultScanner = table.getScanner(scan);
try {
table.close();
} catch (IOException e) {
- logger.warn("Failure while closing HBase table", e);
+ logger.warn("Failure while closing HBase table: " + subScanSpec.getTableName(), e);
}
} catch (IOException e1) {
throw new DrillRuntimeException(e1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 0a4eabe..661e1b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -37,11 +36,10 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
- Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).getHBaseConf();
- for(HBaseSubScan.HBaseSubScanSpec e : subScan.getRegionScanSpecList()){
+ for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
readers.add(
- new HBaseRecordReader(config, e, subScan.getColumns(), context)
+ new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, subScan.getColumns(), context)
);
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
@@ -49,4 +47,5 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
}
return new ScanBatch(subScan, context, readers.iterator());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index b6ff069..5a434d6 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
import org.apache.drill.common.logical.StoragePluginConfigBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,7 +30,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
@JsonTypeName("hbase")
-public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
+public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
@JsonProperty
public String zookeeperQuorum;
@@ -47,9 +49,11 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
this.zookeeperPort = zookeeperPort;
this.hbaseConf = HBaseConfiguration.create();
+ logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
+ zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
- hbaseConf.set("hbase.zookeeper.quorum", zookeeperQuorum);
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
+ hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
}
this.hbaseConfKey = new HConnectionKey(hbaseConf);
}
@@ -79,7 +83,7 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
@VisibleForTesting
public void setZookeeperPort(int zookeeperPort) {
this.zookeeperPort = zookeeperPort;
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6b87817..3f20087 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -48,7 +48,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
@JsonProperty
- public final StoragePluginConfig storage;
+ public final HBaseStoragePluginConfig storage;
@JsonIgnore
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -61,7 +61,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
- this.storage = storage;
+ this.storage = (HBaseStoragePluginConfig) storage;
this.columns = columns;
}
@@ -78,7 +78,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
}
@JsonIgnore
- public StoragePluginConfig getStorageConfig() {
+ public HBaseStoragePluginConfig getStorageConfig() {
return storage;
}
@@ -114,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, regionScanSpecList, columns);
+ return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c64dc97..dc08031 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -19,7 +19,7 @@
"zookeeperPort" : 2181
},
columns: [
- "`f2`.c1", "`f2`.c2"
+ "`f2`.c1", "`f2`.c2", "row_key"
]
},
{