You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:25 UTC

[02/11] git commit: DRILL-683: Qualify HBase scan with specified columns even if row_key is required.

DRILL-683: Qualify HBase scan with specified columns even if row_key is required.

 + Added some log messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/49d53335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/49d53335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/49d53335

Branch: refs/heads/master
Commit: 49d533355e6cb98c623f68c3b90331c62ecbc270
Parents: e602b2a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 02:19:36 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:45:59 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/DrillHBaseConstants.java   |  2 ++
 .../exec/store/hbase/HBaseRecordReader.java     | 21 ++++++--------------
 .../exec/store/hbase/HBaseScanBatchCreator.java |  7 +++----
 .../store/hbase/HBaseStoragePluginConfig.java   | 12 +++++++----
 .../drill/exec/store/hbase/HBaseSubScan.java    |  8 ++++----
 ...base_scan_screen_physical_column_select.json |  2 +-
 6 files changed, 24 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index 7969c45..a86797b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -24,4 +24,6 @@ public interface DrillHBaseConstants {
 
   static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
 
+  static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index af059f5..381cd6a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -43,6 +41,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
@@ -70,7 +68,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private Result leftOver;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
-  private HTable table;
 
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
@@ -110,15 +107,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
 
     try {
-      if (rowKeySchemaPath != null) {
-        /* if ROW_KEY was requested, we can not qualify the scan with columns,
-         * otherwise HBase will omit the entire row of all of the specified columns do
-         * not exist for that row. Eventually we may want to use Family and/or Qualifier
-         * Filters in such case but that would mean additional processing at server.
-         */
-        scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR));
-      }
-
       Filter scanFilter = subScanSpec.getScanFilter();
       if (rowKeyOnly) {
         /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
@@ -134,12 +122,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
       scan.setFilter(scanFilter);
       scan.setCaching(TARGET_RECORD_COUNT);
 
-      table = new HTable(conf, subScanSpec.getTableName());
+      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
+                   subScanSpec.getTableName(), conf.get(HConstants.ZOOKEEPER_QUORUM),
+                   conf.get(HBASE_ZOOKEEPER_PORT), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      HTable table = new HTable(conf, subScanSpec.getTableName());
       resultScanner = table.getScanner(scan);
       try {
         table.close();
       } catch (IOException e) {
-        logger.warn("Failure while closing HBase table", e);
+        logger.warn("Failure while closing HBase table: " + subScanSpec.getTableName(), e);
       }
     } catch (IOException e1) {
       throw new DrillRuntimeException(e1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 0a4eabe..661e1b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -37,11 +36,10 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
   public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
-    Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).getHBaseConf();
-    for(HBaseSubScan.HBaseSubScanSpec e : subScan.getRegionScanSpecList()){
+    for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
       try {
         readers.add(
-            new HBaseRecordReader(config, e, subScan.getColumns(), context)
+            new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, subScan.getColumns(), context)
         );
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
@@ -49,4 +47,5 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
     }
     return new ScanBatch(subScan, context, readers.iterator());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index b6ff069..5a434d6 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,7 +30,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 
 @JsonTypeName("hbase")
-public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
+public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
 
   @JsonProperty
   public String zookeeperQuorum;
@@ -47,9 +49,11 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
     this.zookeeperPort = zookeeperPort;
 
     this.hbaseConf = HBaseConfiguration.create();
+    logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
+        zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
-      hbaseConf.set("hbase.zookeeper.quorum", zookeeperQuorum);
-      hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
+      hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
     }
     this.hbaseConfKey = new HConnectionKey(hbaseConf);
   }
@@ -79,7 +83,7 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
   @VisibleForTesting
   public void setZookeeperPort(int zookeeperPort) {
     this.zookeeperPort = zookeeperPort;
-    hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+    hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6b87817..3f20087 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -48,7 +48,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
 
   @JsonProperty
-  public final StoragePluginConfig storage;
+  public final HBaseStoragePluginConfig storage;
   @JsonIgnore
   private final HBaseStoragePlugin hbaseStoragePlugin;
   private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -61,7 +61,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
     hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
     this.regionScanSpecList = regionScanSpecList;
-    this.storage = storage;
+    this.storage = (HBaseStoragePluginConfig) storage;
     this.columns = columns;
   }
 
@@ -78,7 +78,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   }
 
   @JsonIgnore
-  public StoragePluginConfig getStorageConfig() {
+  public HBaseStoragePluginConfig getStorageConfig() {
     return storage;
   }
 
@@ -114,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, regionScanSpecList, columns);
+    return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c64dc97..dc08031 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -19,7 +19,7 @@
       "zookeeperPort" : 2181
     },
     columns: [
-      "`f2`.c1", "`f2`.c2"
+      "`f2`.c1", "`f2`.c2", "row_key"
     ]
   },
   {