You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:42:15 UTC
[26/50] [abbrv] phoenix git commit: PHOENIX-3600 Core MapReduce
classes don't provide location info
PHOENIX-3600 Core MapReduce classes don't provide location info
This mostly just ports the same functionality in the phoenix-hive MR
classes to the main classes. Adds a new configuration parameter
'phoenix.mapreduce.split.by.stats', defaulting to true, to create
input splits based off the scans provided by statistics, not just the
region locations.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1b1cd87
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1b1cd87
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1b1cd87
Branch: refs/heads/encodecolumns2
Commit: e1b1cd8733d7adfca3a17899630c73881af187f1
Parents: 44dc576
Author: Josh Mahonin <jm...@gmail.com>
Authored: Mon Feb 13 10:55:06 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Mon Feb 13 11:04:40 2017 -0500
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixInputFormat.java | 69 ++++++++++++++++++--
.../phoenix/mapreduce/PhoenixInputSplit.java | 23 ++++++-
.../util/PhoenixConfigurationUtil.java | 11 ++++
3 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index df96c7b..14f7b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -21,14 +21,18 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -42,6 +46,7 @@ import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -80,16 +85,72 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context,configuration);
final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+ final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration);
return splits;
}
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration config) throws IOException {
Preconditions.checkNotNull(qplan);
Preconditions.checkNotNull(splits);
+
+ // Get the RegionSizeCalculator
+ org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(config);
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
+ .getTableRef().getTable().getPhysicalName().toString()));
+ RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
+ .getAdmin());
+
+
final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
for (List<Scan> scans : qplan.getScans()) {
- psplits.add(new PhoenixInputSplit(scans));
+ // Get the region location
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false
+ );
+
+ String regionLocation = location.getHostname();
+
+ // Get the region size
+ long regionSize = sizeCalculator.getRegionSize(
+ location.getRegionInfo().getRegionName()
+ );
+
+ // Generate splits based off statistics, or just region splits?
+ boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
+
+ if(splitByStats) {
+ for(Scan aScan: scans) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
+ .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
+ aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
+ .getBatch() + "] and regionLocation : " + regionLocation);
+ }
+
+ psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation));
+ }
+ }
+ else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
+ .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
+ .size() - 1).getStopRow()));
+ LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
+ .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
+ "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ + ", " + scans.get(0).getBatch() + "] and regionLocation : " +
+ regionLocation);
+
+ for (int i = 0, limit = scans.size(); i < limit; i++) {
+ LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
+ .toStringBinary(scans.get(i).getAttribute
+ (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+ }
+ }
+
+ psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
+ }
}
return psplits;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index caee3cd..6d3c5e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +40,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
private List<Scan> scans;
private KeyRange keyRange;
+ private String regionLocation = null;
+ private long regionSize = 0;
/**
* No Arg constructor
@@ -53,9 +54,15 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
* @param keyRange
*/
public PhoenixInputSplit(final List<Scan> scans) {
+ this(scans, 0, null);
+ }
+
+ public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) {
Preconditions.checkNotNull(scans);
Preconditions.checkState(!scans.isEmpty());
this.scans = scans;
+ this.regionSize = regionSize;
+ this.regionLocation = regionLocation;
init();
}
@@ -73,6 +80,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void readFields(DataInput input) throws IOException {
+ regionLocation = WritableUtils.readString(input);
+ regionSize = WritableUtils.readVLong(input);
int count = WritableUtils.readVInt(input);
scans = Lists.newArrayListWithExpectedSize(count);
for (int i = 0; i < count; i++) {
@@ -87,6 +96,9 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput output) throws IOException {
+ WritableUtils.writeString(output, regionLocation);
+ WritableUtils.writeVLong(output, regionSize);
+
Preconditions.checkNotNull(scans);
WritableUtils.writeVInt(output, scans.size());
for (Scan scan : scans) {
@@ -99,12 +111,17 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
- return 0;
+ return regionSize;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
+ if(regionLocation == null) {
+ return new String[]{};
+ }
+ else {
+ return new String[]{regionLocation};
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index f3e4450..1d2cbbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -106,6 +106,11 @@ public final class PhoenixConfigurationUtil {
public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
+ // Generate splits based on scans from stats, or just from region splits
+ public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
+
+ public static final boolean DEFAULT_SPLIT_BY_STATS = true;
+
public enum SchemaType {
TABLE,
QUERY;
@@ -459,4 +464,10 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(DISABLED_INDEXES);
}
+
+ public static boolean getSplitByStats(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);
+ return split;
+ }
}