You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by "Jim Kellerman (POWERSET)" <Ji...@microsoft.com> on 2008/10/23 00:35:45 UTC
RE: Improving locality of table access...
In the future, you should send HBase questions to the HBase user
mailing list: hbase-user@hadoop.apache.org if you want to get
a more timely response. HBase development is disconnected from
Hadoop development for the most part.
---
Jim Kellerman, Powerset (Live Search, Microsoft Corporation)
> -----Original Message-----
> From: Arthur van Hoff [mailto:avh@ellerdale.com]
> Sent: Wednesday, October 22, 2008 3:14 PM
> To: core-user@hadoop.apache.org
> Subject: Improving locality of table access...
>
> Hi,
>
> Below is some code for improving the read performance of large tables by
> processing each region on the host holding that region. We measured 50-60%
> lower network bandwidth.
>
> To use this class instead of
> org.apache.hadoop.hbase.mapred.TableInputFormat
> class use:
>
> jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix);
>
> Please send me feedback, if you can think off better ways to do this.
>
> --
> Arthur van Hoff - Grand Master of Alphabetical Order
> The Ellerdale Project, Menlo Park, CA
> avh@ellerdale.com, 650-283-0842
>
>
> -- TableInputFormatFix.java --
>
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> // Author: Arthur van Hoff, avh@ellerdale.com
>
> package ellerdale.mapreduce;
>
> import java.io.*;
> import java.util.*;
>
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.util.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.mapred.*;
>
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.client.*;
> import org.apache.hadoop.hbase.client.Scanner;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.util.*;
>
> //
> // Attempt to fix the localized nature of table segments.
> // Compute table splits so that they are processed locally.
> // Combine multiple splits to avoid the number of splits exceeding
> numSplits.
> // Sort the resulting splits so that the shortest ones are processed last.
> // The resulting savings in network bandwidth are significant (we measured
> 60%).
> //
> public class TableInputFormatFix extends TableInputFormat
> {
> public static final int ORIGINAL = 0;
> public static final int LOCALIZED = 1;
> public static final int OPTIMIZED = 2; // not yet functional
>
> //
> // A table split with a location.
> //
> static class LocationTableSplit extends TableSplit implements
> Comparable
> {
> String location;
>
> public LocationTableSplit()
> {
> }
> public LocationTableSplit(byte [] tableName, byte [] startRow, byte []
> endRow, String location)
> {
> super(tableName, startRow, endRow);
> this.location = location;
> }
> public String[] getLocations()
> {
> return new String[] {location};
> }
> public void readFields(DataInput in) throws IOException
> {
> super.readFields(in);
> this.location = Bytes.toString(Bytes.readByteArray(in));
> }
> public void write(DataOutput out) throws IOException
> {
> super.write(out);
> Bytes.writeByteArray(out, Bytes.toBytes(location));
> }
> public int compareTo(Object other)
> {
> LocationTableSplit otherSplit = (LocationTableSplit)other;
> int result = Bytes.compareTo(getStartRow(),
> otherSplit.getStartRow());
> return result;
> }
> public String toString()
> {
> return location.substring(0, location.indexOf('.')) + ": " +
> Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow());
> }
> }
>
> //
> // A table split with a location that covers multiple regions.
> //
> static class MultiRegionTableSplit extends LocationTableSplit
> {
> byte[][] regions;
>
> public MultiRegionTableSplit()
> {
> }
> public MultiRegionTableSplit(byte[] tableName, String location,
> byte[][]
> regions) throws IOException
> {
> super(tableName, regions[0], regions[regions.length-1], location);
> this.location = location;
> this.regions = regions;
> }
> public void readFields(DataInput in) throws IOException
> {
> super.readFields(in);
> int n = in.readInt();
> regions = new byte[n][];
> for (int i = 0 ; i < n ; i++) {
> regions[i] = Bytes.readByteArray(in);
> }
> }
> public void write(DataOutput out) throws IOException
> {
> super.write(out);
> out.writeInt(regions.length);
> for (int i = 0 ; i < regions.length ; i++) {
> Bytes.writeByteArray(out, regions[i]);
> }
> }
> public String toString()
> {
> String str = location.substring(0, location.indexOf('.')) + ": ";
> for (int i = 0 ; i < regions.length ; i += 2) {
> if (i > 0) {
> str += ", ";
> }
> str += Bytes.toString(regions[i]) + "-" +
> Bytes.toString(regions[i+1]);
> }
> return str;
> }
> public int compareTo(Object other)
> {
> MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other;
> int result = otherSplit.regions.length - regions.length;
> if (result == 0) {
> result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow());
> }
> return result;
> }
> }
>
> //
> // TableRecordReader that can handle multiple regions.
> //
> protected class MultiRegionTableRecordReader implements
> RecordReader<ImmutableBytesWritable, RowResult>
> {
> private HTable htable;
> private byte [][] trrInputColumns;
>
> private int currentregion;
> private byte[][] regions;
> private byte [] lastRow;
> private Scanner scanner;
>
> void setHTable(HTable htable)
> {
> this.htable = htable;
> }
> void setInputColumns(final byte [][] inputColumns)
> {
> this.trrInputColumns = inputColumns;
> }
> void setRegions(byte[][] regions)
> {
> this.regions = regions;
> }
>
> int getRegion(byte[] row)
> {
> for (int i = 0 ; i < regions.length ; i += 2) {
> byte[] startRow = regions[i + 0];
> byte[] endRow = regions[i + 1];
> if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0) {
> continue;
> }
> if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) {
> continue;
> }
> return i/2;
> }
> return -1;
> }
>
> //
> // The buisiness end the the reader
> //
> public void init() throws IOException
> {
> restart(regions[0]);
> }
> public void restart(byte[] row) throws IOException
> {
> currentregion = getRegion(row);
> byte[] startRow = regions[currentregion*2 + 0];
> byte[] endRow = regions[currentregion*2 + 1];
> if (endRow.length > 0) {
> scanner = htable.getScanner(trrInputColumns, startRow, endRow);
> } else {
> scanner = htable.getScanner(trrInputColumns, startRow);
> }
> }
>
> public ImmutableBytesWritable createKey()
> {
> return new ImmutableBytesWritable();
> }
> public RowResult createValue()
> {
> return new RowResult();
> }
>
> public long getPos()
> {
> return 0;
> }
> public float getProgress()
> {
> int nregions = regions.length/2;
> return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f;
> }
>
> public boolean next(ImmutableBytesWritable key, RowResult value)
> throws
> IOException
> {
> while (true) {
> RowResult result;
> try {
> result = scanner.next();
> } catch (UnknownScannerException e) {
> restart(lastRow);
> scanner.next(); // skip presumed already mapped row
> result = scanner.next();
> }
> boolean hasMore = result != null && result.size() > 0;
> if (!hasMore && currentregion+1 < regions.length/2) {
> // move to the next region
> restart(regions[(currentregion+1)*2]);
> continue;
> }
> if (hasMore) {
> key.set(result.getRow());
> lastRow = key.get();
> Writables.copyWritable(result, value);
> }
> return hasMore;
> }
> }
>
> public void close()
> {
> scanner.close();
> }
> }
>
> //
> // Main class
> //
>
> int type;
> HTable table;
> byte[][] inputColumns;
> MultiRegionTableRecordReader reader;
>
> public TableInputFormatFix()
> {
> this(OPTIMIZED);
> }
> public TableInputFormatFix(int type)
> {
> this.type = type;
> this.reader = new MultiRegionTableRecordReader();
> }
>
> protected void setHTable(HTable table)
> {
> this.table = table;
> super.setHTable(table);
> }
> protected void setInputColums(byte [][] inputColumns)
> {
> this.inputColumns = inputColumns;
> super.setInputColums(inputColumns);
> }
>
> //
> // Create RecordReader
> //
> public RecordReader<ImmutableBytesWritable, RowResult>
> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
> IOException
> {
> TableSplit tSplit = (TableSplit) split;
> MultiRegionTableRecordReader trr = reader;
> trr.setHTable(this.table);
> trr.setInputColumns(this.inputColumns);
> //trr.setRowFilter(this.rowFilter);
> if (tSplit instanceof MultiRegionTableSplit) {
> trr.setRegions(((MultiRegionTableSplit)tSplit).regions);
> } else {
> trr.setRegions(new byte[][] {tSplit.getStartRow(),
> tSplit.getEndRow()});
> }
> trr.init();
> return trr;
> }
>
> //
> // Compute the splits.
> //
> public InputSplit[] getSplits(JobConf job, int numSplits) throws
> IOException
> {
> InputSplit[] splits = null;
> byte [][] startKeys = this.table.getStartKeys();
> if (startKeys == null || startKeys.length == 0) {
> throw new IOException("Expecting at least one region");
> }
> if (this.table == null) {
> throw new IOException("No table was provided");
> }
> if (this.inputColumns == null || this.inputColumns.length == 0) {
> throw new IOException("Expecting at least one column");
> }
>
> switch (type) {
> case ORIGINAL:
> {
> // This is the original algorithm with no locations.
> int realNumSplits = numSplits > startKeys.length ?
> startKeys.length
> : numSplits;
> splits = new InputSplit[realNumSplits];
> int middle = startKeys.length / realNumSplits;
> int startPos = 0;
> for (int i = 0; i < realNumSplits; i++) {
> int lastPos = startPos + middle;
> lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
> splits[i] = new TableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW);
>
> startPos = lastPos;
> }
> break;
> }
> case LOCALIZED:
> {
> // This is the original algorithm with a minor fix for adding the
> likely location of each region.
> int realNumSplits = numSplits > startKeys.length ?
> startKeys.length
> : numSplits;
> splits = new InputSplit[realNumSplits];
> int middle = startKeys.length / realNumSplits;
> int startPos = 0;
> for (int i = 0; i < realNumSplits; i++) {
> int lastPos = startPos + middle;
> lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
> String regionLocation =
> table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostnam
> e();
> splits[i] = new LocationTableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW, regionLocation);
>
> startPos = lastPos;
> }
> break;
> }
> case OPTIMIZED:
> {
> // This is an optimized algorithm that bonds multiple regions
> together in each split.
> int nregions = 0;
> int nhosts = 0;
> HashMap<String, ArrayList<HRegionInfo>> hosts = new
> HashMap<String,
> ArrayList<HRegionInfo>>();
> for (java.util.Map.Entry<HRegionInfo,HServerAddress> e :
> table.getRegionsInfo().entrySet()) {
> String host = e.getValue().getHostname();
> ArrayList<HRegionInfo> regions = hosts.get(host);
> if (regions == null) {
> regions = new ArrayList<HRegionInfo>();
> hosts.put(host, regions);
> nhosts++;
> }
> regions.add(e.getKey());
> nregions++;
> }
> if (numSplits < nhosts) {
> numSplits = nhosts;
> }
> if (numSplits > nregions) {
> numSplits = nregions;
> }
> float sph = (float)numSplits/nhosts;
> float sphremainder = 0f;
>
> ArrayList<InputSplit> splitlist = new ArrayList<InputSplit>();
> for (String host : hosts.keySet()) {
> ArrayList<HRegionInfo> regions = hosts.get(host);
> float rps = ((regions.size() - 1) + sphremainder) / (sph - 1);
> sphremainder = sph;
> float rpsremainder = 0f;
> for (int i = 0 ; i < regions.size() ;) {
> rpsremainder += rps;
> int splitSize = Math.max(1, (int)rpsremainder);
> if (i + splitSize > regions.size()) {
> splitSize = regions.size() - i;
> }
> //System.out.println(host + ": " + numSplits + "/" + nregions
> +
> "/" + splitSize + ":");
> byte[][] splitregions = new byte[splitSize*2][];
> for (int j = 0 ; j < splitSize ; j++) {
> HRegionInfo region = regions.get(i + j);
> splitregions[j*2 + 0] = region.getStartKey();
> splitregions[j*2 + 1] = region.getEndKey();
> }
> splitlist.add(new MultiRegionTableSplit(table.getTableName(),
> host, splitregions));
>
> i += splitSize;
> rpsremainder -= splitSize;
> sphremainder -= 1;
> }
> }
>
> // copy into a real array (there must be a better way)
> int n = splitlist.size();
> //n = 1;
> splits = new InputSplit[n];
> for (int i = splits.length ; i-- > 0 ;) {
> splits[i] = splitlist.get(i);
> }
> Arrays.sort(splits);
> break;
> }
> }
>
> // dump the splits
> if (false) {
> System.out.println("---- " + splits.length + " splits ----");
> for (int i = 0 ; i < splits.length ; i++) {
> System.out.println(i + ": " + splits[i].toString());
> }
> System.exit(1);
> }
>
> // return the splits
> return splits;
> }
> }