You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2017/02/13 16:03:19 UTC
[1/2] phoenix git commit: PHOENIX-3600 Core MapReduce classes don't
provide location info
Repository: phoenix
Updated Branches:
refs/heads/master 41d6349bd -> 8f2d0fbc5
PHOENIX-3600 Core MapReduce classes don't provide location info
This mostly just ports the same functionality in the phoenix-hive MR
classes to the main classes. Adds a new configuration parameter
'phoenix.mapreduce.split.by.stats', defaulting to true, to create
input splits based off the scans provided by statistics, not just the
region locations.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/267323da
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/267323da
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/267323da
Branch: refs/heads/master
Commit: 267323da8242fb6f0953c1a75cf96c5fde3d49ed
Parents: 41d6349
Author: Josh Mahonin <jm...@gmail.com>
Authored: Mon Feb 13 10:55:06 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Mon Feb 13 10:55:06 2017 -0500
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixInputFormat.java | 69 ++++++++++++++++++--
.../phoenix/mapreduce/PhoenixInputSplit.java | 23 ++++++-
.../util/PhoenixConfigurationUtil.java | 11 ++++
3 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index df96c7b..14f7b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -21,14 +21,18 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -42,6 +46,7 @@ import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -80,16 +85,72 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context,configuration);
final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+ final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration);
return splits;
}
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration config) throws IOException {
Preconditions.checkNotNull(qplan);
Preconditions.checkNotNull(splits);
+
+ // Get the RegionSizeCalculator
+ org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(config);
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
+ .getTableRef().getTable().getPhysicalName().toString()));
+ RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
+ .getAdmin());
+
+
final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
for (List<Scan> scans : qplan.getScans()) {
- psplits.add(new PhoenixInputSplit(scans));
+ // Get the region location
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false
+ );
+
+ String regionLocation = location.getHostname();
+
+ // Get the region size
+ long regionSize = sizeCalculator.getRegionSize(
+ location.getRegionInfo().getRegionName()
+ );
+
+ // Generate splits based off statistics, or just region splits?
+ boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
+
+ if(splitByStats) {
+ for(Scan aScan: scans) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
+ .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
+ aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
+ .getBatch() + "] and regionLocation : " + regionLocation);
+ }
+
+ psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation));
+ }
+ }
+ else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
+ .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
+ .size() - 1).getStopRow()));
+ LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
+ .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
+ "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ + ", " + scans.get(0).getBatch() + "] and regionLocation : " +
+ regionLocation);
+
+ for (int i = 0, limit = scans.size(); i < limit; i++) {
+ LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
+ .toStringBinary(scans.get(i).getAttribute
+ (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+ }
+ }
+
+ psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
+ }
}
return psplits;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index caee3cd..6d3c5e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +40,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
private List<Scan> scans;
private KeyRange keyRange;
+ private String regionLocation = null;
+ private long regionSize = 0;
/**
* No Arg constructor
@@ -53,9 +54,15 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
* @param keyRange
*/
public PhoenixInputSplit(final List<Scan> scans) {
+ this(scans, 0, null);
+ }
+
+ public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) {
Preconditions.checkNotNull(scans);
Preconditions.checkState(!scans.isEmpty());
this.scans = scans;
+ this.regionSize = regionSize;
+ this.regionLocation = regionLocation;
init();
}
@@ -73,6 +80,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void readFields(DataInput input) throws IOException {
+ regionLocation = WritableUtils.readString(input);
+ regionSize = WritableUtils.readVLong(input);
int count = WritableUtils.readVInt(input);
scans = Lists.newArrayListWithExpectedSize(count);
for (int i = 0; i < count; i++) {
@@ -87,6 +96,9 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput output) throws IOException {
+ WritableUtils.writeString(output, regionLocation);
+ WritableUtils.writeVLong(output, regionSize);
+
Preconditions.checkNotNull(scans);
WritableUtils.writeVInt(output, scans.size());
for (Scan scan : scans) {
@@ -99,12 +111,17 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
- return 0;
+ return regionSize;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
+ if(regionLocation == null) {
+ return new String[]{};
+ }
+ else {
+ return new String[]{regionLocation};
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index f3e4450..1d2cbbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -106,6 +106,11 @@ public final class PhoenixConfigurationUtil {
public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
+ // Generate splits based on scans from stats, or just from region splits
+ public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
+
+ public static final boolean DEFAULT_SPLIT_BY_STATS = true;
+
public enum SchemaType {
TABLE,
QUERY;
@@ -459,4 +464,10 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(DISABLED_INDEXES);
}
+
+ public static boolean getSplitByStats(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);
+ return split;
+ }
}
[2/2] phoenix git commit: PHOENIX-3601 PhoenixRDD doesn't expose the
preferred node locations to Spark
Posted by jm...@apache.org.
PHOENIX-3601 PhoenixRDD doesn't expose the preferred node locations to Spark
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f2d0fbc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f2d0fbc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f2d0fbc
Branch: refs/heads/master
Commit: 8f2d0fbc5e4d14dc04c2491d78cea1a4b93be0b7
Parents: 267323d
Author: Josh Mahonin <jm...@gmail.com>
Authored: Mon Feb 13 10:58:02 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Mon Feb 13 10:58:02 2017 -0500
----------------------------------------------------------------------
.../src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f2d0fbc/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 01a9077..63547d2 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -55,6 +55,10 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
phoenixRDD.partitions
}
+ override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ phoenixRDD.preferredLocations(split)
+ }
+
@DeveloperApi
override def compute(split: Partition, context: TaskContext) = {
phoenixRDD.compute(split, context).map(r => r._2)