You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:34 UTC
[47/50] [abbrv] drill git commit: DRILL-4199: Add Support for HBase
1.X
DRILL-4199: Add Support for HBase 1.X
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1882d938
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1882d938
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1882d938
Branch: refs/heads/master
Commit: 1882d938f0ed94679f1a2766181eca2a02eb555a
Parents: 5fa9ba3
Author: Aditya <ad...@mapr.com>
Authored: Wed Mar 30 17:55:59 2016 -0700
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:41 2016 -0700
----------------------------------------------------------------------
contrib/format-maprdb/pom.xml | 2 +-
.../exec/store/mapr/db/MapRDBFormatPlugin.java | 20 +++++++++-
.../exec/store/mapr/db/MapRDBGroupScan.java | 4 +-
.../store/mapr/db/MapRDBScanBatchCreator.java | 6 +--
.../drill/exec/store/mapr/db/MapRDBSubScan.java | 39 +++++++++++---------
.../mapr/db/binary/BinaryTableGroupScan.java | 30 +++++++--------
.../db/binary/CompareFunctionsProcessor.java | 12 +++---
.../drill/maprdb/tests/json/BaseJsonTest.java | 2 +-
8 files changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml
index d14a2c5..fb01818 100644
--- a/contrib/format-maprdb/pom.xml
+++ b/contrib/format-maprdb/pom.xml
@@ -32,7 +32,7 @@
<properties>
<maprdb-storage-plugin.mapr.version>5.1.0.37817-mapr</maprdb-storage-plugin.mapr.version>
- <maprdb-storage-plugin.hbase.version>0.98.12-mapr-1506</maprdb-storage-plugin.hbase.version>
+ <maprdb-storage-plugin.hbase.version>1.1.1-mapr-1602-m7-5.1.0</maprdb-storage-plugin.hbase.version>
<maprdb-storage-plugin.hadoop.version>2.7.0-mapr-1602</maprdb-storage-plugin.hadoop.version>
<maprdb.TestSuite>**/MaprDBTestsSuite.class</maprdb.TestSuite>
</properties>
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index 9fe16e4..755ae4f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -35,6 +35,9 @@ import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableSet;
@@ -44,11 +47,16 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class);
private final MapRDBFormatMatcher matcher;
+ private final Configuration hbaseConf;
+ private final Connection connection;
public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) {
+ StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) throws IOException {
super(name, context, fsConf, storageConfig, formatConfig);
matcher = new MapRDBFormatMatcher(this);
+ hbaseConf = HBaseConfiguration.create(fsConf);
+ hbaseConf.set(ConnectionFactory.DEFAULT_DB, ConnectionFactory.MAPR_ENGINE2);
+ connection = ConnectionFactory.createConnection(hbaseConf);
}
@Override
@@ -79,4 +87,14 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
}
}
+ @JsonIgnore
+ public Configuration getHBaseConf() {
+ return hbaseConf;
+ }
+
+ @JsonIgnore
+ public Connection getConnection() {
+ return connection;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index 8563b78..e6c71e0 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -49,9 +49,9 @@ import com.google.common.collect.Sets;
public abstract class MapRDBGroupScan extends AbstractGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
- private FileSystemPlugin storagePlugin;
+ protected FileSystemPlugin storagePlugin;
- private MapRDBFormatPlugin formatPlugin;
+ protected MapRDBFormatPlugin formatPlugin;
protected MapRDBFormatPluginConfig formatPluginConfig;
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index 1d51223..c989bb0 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -29,8 +29,6 @@ import org.apache.drill.exec.store.hbase.HBaseRecordReader;
import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -42,11 +40,11 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
- Configuration conf = HBaseConfiguration.create();
for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
- readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
+ readers.add(new HBaseRecordReader(subScan.getFormatPlugin().getConnection(),
+ getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
} else {
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
index dea6867..794141c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -44,37 +44,37 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
@JsonProperty
- public final StoragePluginConfig storage;
+ public final StoragePluginConfig storageConfig;
@JsonIgnore
- private final MapRDBFormatPluginConfig fsFormatPluginConfig;
- private final FileSystemPlugin fsStoragePlugin;
+ private final MapRDBFormatPluginConfig formatPluginConfig;
+ private final FileSystemPlugin storagePlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
private final String tableType;
+ private final MapRDBFormatPlugin formatPlugin;
+
@JsonCreator
public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
- @JsonProperty("storage") StoragePluginConfig storage,
+ @JsonProperty("storageConfig") StoragePluginConfig storage,
@JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("tableType") String tableType) throws ExecutionSetupException {
- super(userName);
- this.fsFormatPluginConfig = formatPluginConfig;
- this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
- this.regionScanSpecList = regionScanSpecList;
- this.storage = storage;
- this.columns = columns;
- this.tableType = tableType;
+ this(userName, formatPluginConfig,
+ (FileSystemPlugin) registry.getPlugin(storage),
+ storage, regionScanSpecList, columns, tableType);
}
- public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
+ public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig storageConfig,
List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
super(userName);
- fsFormatPluginConfig = formatPluginConfig;
- fsStoragePlugin = storagePlugin;
- storage = config;
+ this.storageConfig = storageConfig;
+ this.storagePlugin = storagePlugin;
+ this.formatPluginConfig = formatPluginConfig;
+ this.formatPlugin = (MapRDBFormatPlugin) storagePlugin.getFormatPlugin(formatPluginConfig);
+
this.regionScanSpecList = maprSubScanSpecs;
this.columns = columns;
this.tableType = tableType;
@@ -101,7 +101,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
+ return new MapRDBSubScan(getUserName(), formatPluginConfig, storagePlugin, storageConfig, regionScanSpecList, columns, tableType);
}
@Override
@@ -119,7 +119,12 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
}
public MapRDBFormatPluginConfig getFormatPluginConfig() {
- return fsFormatPluginConfig;
+ return formatPluginConfig;
+ }
+
+ @JsonIgnore
+ public MapRDBFormatPlugin getFormatPlugin() {
+ return formatPlugin;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index a597995..4eaeee7 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -21,8 +21,6 @@ import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty;
import java.io.IOException;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -45,11 +43,12 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.codehaus.jackson.annotate.JsonCreator;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -112,23 +111,22 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
private void init() {
logger.debug("Getting region locations");
- try {
- Configuration conf = HBaseConfiguration.create();
- HTable table = new HTable(conf, hbaseScanSpec.getTableName());
- tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName());
- this.hTableDesc = table.getTableDescriptor();
- NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
- table.close();
+ TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName());
+ try (Admin admin = formatPlugin.getConnection().getAdmin();
+ RegionLocator locator = formatPlugin.getConnection().getRegionLocator(tableName)) {
+ hTableDesc = admin.getTableDescriptor(tableName);
+ tableStats = new MapRDBTableStats(getHBaseConf(), hbaseScanSpec.getTableName());
boolean foundStartRegion = false;
regionsToScan = new TreeMap<TabletFragmentInfo, String>();
- for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
- HRegionInfo regionInfo = mapEntry.getKey();
+ List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
+ for (HRegionLocation regionLocation : regionLocations) {
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
continue;
}
foundStartRegion = true;
- regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname());
+ regionsToScan.put(new TabletFragmentInfo(regionInfo), regionLocation.getHostname());
if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
break;
}
@@ -191,7 +189,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
@JsonIgnore
public Configuration getHBaseConf() {
- return HBaseConfiguration.create();
+ return getFormatPlugin().getHBaseConf();
}
@JsonIgnore
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
index 0c901d7..a83abf3 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/CompareFunctionsProcessor.java
@@ -40,7 +40,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.hadoop.hbase.util.Order;
import org.apache.hadoop.hbase.util.PositionedByteRange;
-import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
+import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
import org.apache.hadoop.hbase.HConstants;
@@ -256,7 +256,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "DOUBLE_OBD":
if (valueArg instanceof DoubleExpression) {
bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+ PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
@@ -271,7 +271,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "FLOAT_OBD":
if (valueArg instanceof FloatExpression) {
bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+ PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
@@ -286,7 +286,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "BIGINT_OBD":
if (valueArg instanceof LongExpression) {
bb = newByteBuf(9, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+ PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
((LongExpression)valueArg).getLong(), Order.DESCENDING);
@@ -301,7 +301,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "INT_OBD":
if (valueArg instanceof IntExpression) {
bb = newByteBuf(5, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+ PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
((IntExpression)valueArg).getInt(), Order.DESCENDING);
@@ -317,7 +317,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
if (valueArg instanceof QuotedString) {
int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
bb = newByteBuf(stringLen + 2, true);
- PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
+ PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, stringLen + 2);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
((QuotedString)valueArg).value, Order.DESCENDING);
http://git-wip-us.apache.org/repos/asf/drill/blob/1882d938/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
index 6aafed3..b503b00 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.hbase.GuavaPatcher;
+import org.apache.drill.exec.util.GuavaPatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;