You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by Ruslan Salyakhov <ru...@gmail.com> on 2010/03/25 02:02:09 UTC

Bulk import, HFiles, Multiple reducers and TotalOrderPartitioner

Hi!

I'm trying to use bulk import that writing HFiles directly into HDFS and
have a problem with multiple reducers. If I run MR to prepare HFIles with
more than one reducer then some values for keys are not appeared in the
table after loadtable.rb script execution. With one reducer everything works
fine. Let's take a look at details:

Environment:
- Hadoop 0.20.1
- HBase release 0.20.3

http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
- the row id must be formatted as a ImmutableBytesWritable
- MR job should ensure a total ordering among all keys

http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
- TotalOrderPartitioner that uses the new API

https://issues.apache.org/jira/browse/HBASE-2063
- patched HFileOutputFormat

Sample data of my keys:
1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1
1.306.CAN.ON.-1.LONDON.1.1.0.1
1.306.USA.CO.751.FT COLLINS.1.1.1.0
1.306.USA.CO.751.LITTLETON.1.1.1.0
4.6.USA.TX.623.MUENSTER.1.1.0.0
4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0
4.68.USA.CT.533.WILLINGTON.1.1.1.0
4.68.USA.LA.642.LAFAYETTE.1.1.1.0
4.9.USA.CT.501.STAMFORD.1.1.0.0
4.9.USA.NJ.504.PRINCETON.1.1.0.1
4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0

I've put everything together:

1) Test of TotalOrderPartitioner that checks how it works with my keys.
note that I've set up my comparator to pass that test
conf.setClass("mapred.output.key.comparator.class", MyKeyComparator.class,
Object.class);

import java.io.IOException;
import java.util.ArrayList;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class TestTotalOrderPartitionerForHFileKeys extends TestCase {

    private static final ImmutableBytesWritable[] splitKeys = new
ImmutableBytesWritable[] {
            // -inf
                    // 0
            new
ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")),
        // 1
            new
ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")),
        // 2
            new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW
YORK.1.1.0.0")),         // 3
            new
ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")),
        // 4
            new
ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")),
        // 5
            new
ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")),
    // 6
            new
ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),//
7
            new
ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")),
    // 8
            new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY
CITY.1.1.0.0"))         // 9
    };

    private static final ArrayList<Check<ImmutableBytesWritable>> testKeys =
new ArrayList<Check<ImmutableBytesWritable>>();
    static {
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9));
        testKeys.add(new Check<ImmutableBytesWritable>(new
ImmutableBytesWritable(Bytes
                .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9));
    };

    public void testTotalOrderHFileKeyBinarySearch() throws Exception {
        TotalOrderPartitioner<ImmutableBytesWritable, NullWritable>
partitioner = new TotalOrderPartitioner<ImmutableBytesWritable,
NullWritable>();
        Configuration conf = new Configuration();
        Path p =
TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable>
writePartitionFile(
                "totalorderbinarysearch", conf, splitKeys);
        conf.setBoolean("total.order.partitioner.natural.order", false);
        conf.setClass("mapred.mapoutput.key.class",
ImmutableBytesWritable.class, Object.class);
        conf.setClass("mapred.output.key.comparator.class",
MyKeyComparator.class, Object.class);

        try {
            partitioner.setConf(conf);
            NullWritable nw = NullWritable.get();
            for (Check<ImmutableBytesWritable> chk : testKeys) {
                log(Bytes.toString(chk.data.get()) + ", chk.part: " +
chk.part + ", should be: "
                        + partitioner.getPartition(chk.data, nw,
splitKeys.length + 1));

                assertEquals(Bytes.toString(chk.data.get()), chk.part,
                        partitioner.getPartition(chk.data, nw,
splitKeys.length + 1));

            }
        } finally {
            p.getFileSystem(conf).delete(p, true);
        }
    }

    public void testInventoryKeyComparator() {
        InventoryKeyComparator comparator = new InventoryKeyComparator();
        for (int i = 0; i < splitKeys.length - 2; i++) {
            // splitKeys should be sorted in ascending order
            int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]);
            assertTrue(res1 < 0);

            int res2 = comparator.compare(splitKeys[i].get(), 0,
splitKeys[i].get().length,
                    splitKeys[i + 1].get(), 0, splitKeys[i +
1].get().length);

            assertTrue(res2 < 0);
            assertTrue(res1 == res2);
        }
    }

    // ----------------------------------------
    // Copy-Paste from TestTotalOrderPartitoner
    // http://issues.apache.org/jira/browse/MAPREDUCE-366
    // ----------------------------------------
    static class Check<T> {
        T data;
        int part;

        Check(T data, int part) {
            this.data = data;
            this.part = part;
        }
    }

    private static <T extends WritableComparable<?>> Path
writePartitionFile(String testname,
            Configuration conf, T[] splits) throws IOException {
        final FileSystem fs = FileSystem.getLocal(conf);
        final Path testdir = new Path(System.getProperty("test.build.data",
"/tmp"))
                .makeQualified(fs);
        Path p = new Path(testdir, testname + "/_partition.lst");
        TotalOrderPartitioner.setPartitionFile(conf, p);
        conf.setInt("mapred.reduce.tasks", splits.length + 1);
        SequenceFile.Writer w = null;
        try {
            w = SequenceFile.createWriter(fs, conf, p, splits[0].getClass(),
NullWritable.class,
                    SequenceFile.CompressionType.NONE);
            for (int i = 0; i < splits.length; ++i) {
                w.append(splits[i], NullWritable.get());
            }
        } finally {
            if (null != w)
                w.close();
        }
        return p;
    }

    private static void log(String message) {
        System.out.println(message);
    }
}

2) MyKeyComparator
I've wrote it to pass test above.

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;

public class MyKeyComparator implements
RawComparator<ImmutableBytesWritable> {

    public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2)
{
        return Bytes.compareTo(o1.get(), o2.get());
    }

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
        return Bytes.compareTo(b1, s1, l1, b2, s2, l2);
    }
}

3) MySampler
this code is based on InputSampler from
http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
BUT I've put the following string into getSample:
            reader.initialize(splits.get(i), new
TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
without that string it doesn't work


import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler
 */
public class MySampler extends Configured  {
      private static final Log LOG = LogFactory.getLog(MySampler.class);

      public MySampler(Configuration conf) {
        setConf(conf);
      }

      /**
       * Sample from random points in the input.
       * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs
from
       * each split.
       */
      public static class RandomSampler {
        private double freq;
        private final int numSamples;
        private final int maxSplitsSampled;

        /**
         * Create a new RandomSampler sampling <em>all</em> splits.
         * This will read every split at the client, which is very
expensive.
         * @param freq Probability with which a key will be chosen.
         * @param numSamples Total number of samples to obtain from all
selected
         *                   splits.
         */
        public RandomSampler(double freq, int numSamples) {
          this(freq, numSamples, Integer.MAX_VALUE);
        }

        /**
         * Create a new RandomSampler.
         * @param freq Probability with which a key will be chosen.
         * @param numSamples Total number of samples to obtain from all
selected
         *                   splits.
         * @param maxSplitsSampled The maximum number of splits to examine.
         */
        public RandomSampler(double freq, int numSamples, int
maxSplitsSampled) {
          this.freq = freq;
          this.numSamples = numSamples;
          this.maxSplitsSampled = maxSplitsSampled;
        }

        /**
         * Randomize the split order, then take the specified number of keys
from
         * each split sampled, where each key is selected with the specified
         * probability and possibly replaced by a subsequently selected key
when
         * the quota of keys from that split is satisfied.
         */
        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't
preserve type
        public ImmutableBytesWritable[]
getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job)
            throws IOException, InterruptedException {
          List<InputSplit> splits = inf.getSplits(job);
          ArrayList<ImmutableBytesWritable> samples = new
ArrayList<ImmutableBytesWritable>(numSamples);
          int splitsToSample = Math.min(maxSplitsSampled, splits.size());

          Random r = new Random();
          long seed = r.nextLong();
          r.setSeed(seed);
          LOG.debug("seed: " + seed);
          // shuffle splits
          for (int i = 0; i < splits.size(); ++i) {
            InputSplit tmp = splits.get(i);
            int j = r.nextInt(splits.size());
            splits.set(i, splits.get(j));
            splits.set(j, tmp);
          }
          // our target rate is in terms of the maximum number of sample
splits,
          // but we accept the possibility of sampling additional splits to
hit
          // the target sample keyset
          for (int i = 0; i < splitsToSample ||
                         (i < splits.size() && samples.size() < numSamples);
++i) {
            RecordReader<ImmutableBytesWritable, Text> reader =
inf.createRecordReader(splits.get(i),
              new TaskAttemptContext(job.getConfiguration(), new
TaskAttemptID()));

            reader.initialize(splits.get(i), new
TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));

            while (reader.nextKeyValue()) {
              if (r.nextDouble() <= freq) {
                if (samples.size() < numSamples) {
                  samples.add(composeKey(reader.getCurrentValue()));
                } else {
                  // When exceeding the maximum number of samples, replace a
                  // random element with this one, then adjust the frequency
                  // to reflect the possibility of existing elements being
                  // pushed out
                  int ind = r.nextInt(numSamples);
                  if (ind != numSamples) {
                    samples.set(ind, composeKey(reader.getCurrentValue()));
                  }
                  freq *= (numSamples - 1) / (double) numSamples;
                }
              }
            }
            reader.close();
          }
          return (ImmutableBytesWritable[])samples.toArray(new
ImmutableBytesWritable[samples.size()]);
        }

      }

      private static ImmutableBytesWritable composeKey(Text value) {
          StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
            int cnt = 0;
            String[] vals = new
String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS];
            while (itr.hasMoreTokens()) {
                vals[cnt] = itr.nextToken();
                cnt++;
            }

            String newKeyStr =
AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0,
                    AdvancedRawLogUploader.NUM_KEY_FILEDS,
AdvancedRawLogUploader.KEY_PARTS_DELIMITER);
            info(newKeyStr);
            ImmutableBytesWritable newKey = new ImmutableBytesWritable(Bytes
                    .toBytes(newKeyStr));
          return newKey;
      }

      /**
       * Write a partition file for the given job, using the Sampler
provided.
       * Queries the sampler for a sample keyset, sorts by the output key
       * comparator, selects the keys for each rank, and writes to the
destination
       * returned from {@link TotalOrderPartitioner#getPartitionFile}.
       */
      @SuppressWarnings("unchecked") // getInputFormat,
getOutputKeyComparator
      public static void writePartitionFile(Job job, RandomSampler sampler)
          throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = job.getConfiguration();
        final InputFormat inf =
            ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        int numPartitions = job.getNumReduceTasks();
        ImmutableBytesWritable[] samples = sampler.getSample(inf, job);
        LOG.info("Using " + samples.length + " samples");
        RawComparator<ImmutableBytesWritable> comparator =
          (RawComparator<ImmutableBytesWritable>) job.getSortComparator();
        Arrays.sort(samples, comparator);
        Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
        FileSystem fs = dst.getFileSystem(conf);
        if (fs.exists(dst)) {
          fs.delete(dst, false);
        }
        SequenceFile.Writer writer = SequenceFile.createWriter(fs,
          conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
        NullWritable nullValue = NullWritable.get();
        float stepSize = samples.length / (float) numPartitions;
        int last = -1;
        for(int i = 1; i < numPartitions; ++i) {
          int k = Math.round(stepSize * i);
          while (last >= k && comparator.compare(samples[last], samples[k])
== 0) {
            ++k;
          }
          writer.append(samples[k], nullValue);
          last = k;
        }
        writer.close();
      }

      private static void info(String message) {
//          LOG.info(message);
          System.out.println(message);
      }


}

4) and finally the definition of my MR job:

MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000,
10);
in = in.makeQualified(in.getFileSystem(conf));
Path partitionFile = new Path(in.getParent(), "_partitions");
// Use TotalOrderPartitioner based on the new API: from
http://issues.apache.org/jira/browse/MAPREDUCE-366
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);

Job job = new Job(conf, "Write HFiles at " + out.getName());
job.setNumReduceTasks(numReduceTasks);
job.setJarByClass(MyHFilesWriter.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setOutputFormatClass(HFileOutputFormat.class);

//job.setSortComparatorClass(MyKeyComparator.class);
// if you uncomment the code above you will get:
        /*
10/03/24 15:00:43 INFO mapred.JobClient: Task Id :
attempt_201003171417_0063_r_000000_0, Status : FAILED
java.io.IOException: Added a key not lexically larger than previous
key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag,
lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId
    at
org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551)
    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513)
    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481)
    at
com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77)
    at
com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49)
    at
org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at
org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46)
    at
org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
    at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

*/

FileOutputFormat.setOutputPath(job, out);
FileInputFormat.addInputPath(job, in);
job.setPartitionerClass(TotalOrderPartitioner.class);
MySampler.writePartitionFile(job, sampler);

System.exit(job.waitForCompletion(true) ? 0 : 1);

So if I do not use the MyKeyComparator class (
job.setSortComparatorClass(MyKeyComparator.class);) then nothing is changed
- it works but the values for some keys are not appeared in the table,
otherwise (with MyKeyComparator) the error occurs "Added a key not lexically
larger than previous key".

What am I doing wrong? I want to run my MR with more than one reducer and
get all data in HBase table after loadtable.rb execution.
Thank you have read this far, I hope you didn't get headache :)

Ruslan Salyakhov | ruslan@jalent.ru

Re: Bulk import, HFiles, Multiple reducers and TotalOrderPartitioner

Posted by Ruslan Salyakhov <ru...@gmail.com>.
Jean-Daniel,
https://issues.apache.org/jira/browse/HBASE-2378

Ruslan

On Thu, Mar 25, 2010 at 8:22 PM, Jean-Daniel Cryans <jd...@apache.org>wrote:

> Ruslan,
>
> I see you did all the required homework but this mail is really hard
> to read. Can you create a jira
> (http://issues.apache.org/jira/browse/HBASE) and attach all the code?
> This will also make it easier to track.
>
> thx!
>
> J-D
>
> On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <ru...@gmail.com>
> wrote:
> > Hi!
> >
> > I'm trying to use bulk import that writing HFiles directly into HDFS and
> > have a problem with multiple reducers. If I run MR to prepare HFIles with
> > more than one reducer then some values for keys are not appeared in the
> > table after loadtable.rb script execution. With one reducer everything
> works
> > fine. Let's take a look at details:
> >
> > Environment:
> > - Hadoop 0.20.1
> > - HBase release 0.20.3
> >
> >
> http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
> > - the row id must be formatted as a ImmutableBytesWritable
> > - MR job should ensure a total ordering among all keys
> >
> > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> > - TotalOrderPartitioner that uses the new API
> >
> > https://issues.apache.org/jira/browse/HBASE-2063
> > - patched HFileOutputFormat
> >
> > Sample data of my keys:
> > 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1
> > 1.306.CAN.ON.-1.LONDON.1.1.0.1
> > 1.306.USA.CO.751.FT COLLINS.1.1.1.0
> > 1.306.USA.CO.751.LITTLETON.1.1.1.0
> > 4.6.USA.TX.623.MUENSTER.1.1.0.0
> > 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0
> > 4.68.USA.CT.533.WILLINGTON.1.1.1.0
> > 4.68.USA.LA.642.LAFAYETTE.1.1.1.0
> > 4.9.USA.CT.501.STAMFORD.1.1.0.0
> > 4.9.USA.NJ.504.PRINCETON.1.1.0.1
> > 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0
> >
> > I've put everything together:
> >
> > 1) Test of TotalOrderPartitioner that checks how it works with my keys.
> > note that I've set up my comparator to pass that test
> > conf.setClass("mapred.output.key.comparator.class",
> MyKeyComparator.class,
> > Object.class);
> >
> > import java.io.IOException;
> > import java.util.ArrayList;
> >
> > import junit.framework.TestCase;
> >
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.FileSystem;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.NullWritable;
> > import org.apache.hadoop.io.SequenceFile;
> > import org.apache.hadoop.io.WritableComparable;
> > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> >
> > public class TestTotalOrderPartitionerForHFileKeys extends TestCase {
> >
> >    private static final ImmutableBytesWritable[] splitKeys = new
> > ImmutableBytesWritable[] {
> >            // -inf
> >                    // 0
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")),
> >        // 1
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")),
> >        // 2
> >            new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW
> > YORK.1.1.0.0")),         // 3
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")),
> >        // 4
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")),
> >        // 5
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")),
> >    // 6
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),//
> > 7
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")),
> >    // 8
> >            new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY
> > CITY.1.1.0.0"))         // 9
> >    };
> >
> >    private static final ArrayList<Check<ImmutableBytesWritable>> testKeys
> =
> > new ArrayList<Check<ImmutableBytesWritable>>();
> >    static {
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9));
> >    };
> >
> >    public void testTotalOrderHFileKeyBinarySearch() throws Exception {
> >        TotalOrderPartitioner<ImmutableBytesWritable, NullWritable>
> > partitioner = new TotalOrderPartitioner<ImmutableBytesWritable,
> > NullWritable>();
> >        Configuration conf = new Configuration();
> >        Path p =
> > TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable>
> > writePartitionFile(
> >                "totalorderbinarysearch", conf, splitKeys);
> >        conf.setBoolean("total.order.partitioner.natural.order", false);
> >        conf.setClass("mapred.mapoutput.key.class",
> > ImmutableBytesWritable.class, Object.class);
> >        conf.setClass("mapred.output.key.comparator.class",
> > MyKeyComparator.class, Object.class);
> >
> >        try {
> >            partitioner.setConf(conf);
> >            NullWritable nw = NullWritable.get();
> >            for (Check<ImmutableBytesWritable> chk : testKeys) {
> >                log(Bytes.toString(chk.data.get()) + ", chk.part: " +
> > chk.part + ", should be: "
> >                        + partitioner.getPartition(chk.data, nw,
> > splitKeys.length + 1));
> >
> >                assertEquals(Bytes.toString(chk.data.get()), chk.part,
> >                        partitioner.getPartition(chk.data, nw,
> > splitKeys.length + 1));
> >
> >            }
> >        } finally {
> >            p.getFileSystem(conf).delete(p, true);
> >        }
> >    }
> >
> >    public void testInventoryKeyComparator() {
> >        InventoryKeyComparator comparator = new InventoryKeyComparator();
> >        for (int i = 0; i < splitKeys.length - 2; i++) {
> >            // splitKeys should be sorted in ascending order
> >            int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]);
> >            assertTrue(res1 < 0);
> >
> >            int res2 = comparator.compare(splitKeys[i].get(), 0,
> > splitKeys[i].get().length,
> >                    splitKeys[i + 1].get(), 0, splitKeys[i +
> > 1].get().length);
> >
> >            assertTrue(res2 < 0);
> >            assertTrue(res1 == res2);
> >        }
> >    }
> >
> >    // ----------------------------------------
> >    // Copy-Paste from TestTotalOrderPartitoner
> >    // http://issues.apache.org/jira/browse/MAPREDUCE-366
> >    // ----------------------------------------
> >    static class Check<T> {
> >        T data;
> >        int part;
> >
> >        Check(T data, int part) {
> >            this.data = data;
> >            this.part = part;
> >        }
> >    }
> >
> >    private static <T extends WritableComparable<?>> Path
> > writePartitionFile(String testname,
> >            Configuration conf, T[] splits) throws IOException {
> >        final FileSystem fs = FileSystem.getLocal(conf);
> >        final Path testdir = new
> Path(System.getProperty("test.build.data",
> > "/tmp"))
> >                .makeQualified(fs);
> >        Path p = new Path(testdir, testname + "/_partition.lst");
> >        TotalOrderPartitioner.setPartitionFile(conf, p);
> >        conf.setInt("mapred.reduce.tasks", splits.length + 1);
> >        SequenceFile.Writer w = null;
> >        try {
> >            w = SequenceFile.createWriter(fs, conf, p,
> splits[0].getClass(),
> > NullWritable.class,
> >                    SequenceFile.CompressionType.NONE);
> >            for (int i = 0; i < splits.length; ++i) {
> >                w.append(splits[i], NullWritable.get());
> >            }
> >        } finally {
> >            if (null != w)
> >                w.close();
> >        }
> >        return p;
> >    }
> >
> >    private static void log(String message) {
> >        System.out.println(message);
> >    }
> > }
> >
> > 2) MyKeyComparator
> > I've wrote it to pass test above.
> >
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.RawComparator;
> >
> > public class MyKeyComparator implements
> > RawComparator<ImmutableBytesWritable> {
> >
> >    public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable
> o2)
> > {
> >        return Bytes.compareTo(o1.get(), o2.get());
> >    }
> >
> >    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int
> l2)
> > {
> >        return Bytes.compareTo(b1, s1, l1, b2, s2, l2);
> >    }
> > }
> >
> > 3) MySampler
> > this code is based on InputSampler from
> > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> > BUT I've put the following string into getSample:
> >            reader.initialize(splits.get(i), new
> > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> > without that string it doesn't work
> >
> >
> > import java.io.IOException;
> > import java.util.ArrayList;
> > import java.util.Arrays;
> > import java.util.List;
> > import java.util.Random;
> > import java.util.StringTokenizer;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.conf.Configured;
> > import org.apache.hadoop.fs.FileSystem;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.NullWritable;
> > import org.apache.hadoop.io.RawComparator;
> > import org.apache.hadoop.io.SequenceFile;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.InputFormat;
> > import org.apache.hadoop.mapreduce.InputSplit;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.hadoop.mapreduce.RecordReader;
> > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > import org.apache.hadoop.mapreduce.TaskAttemptID;
> > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> > import org.apache.hadoop.util.ReflectionUtils;
> >
> > /**
> >  * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler
> >  */
> > public class MySampler extends Configured  {
> >      private static final Log LOG = LogFactory.getLog(MySampler.class);
> >
> >      public MySampler(Configuration conf) {
> >        setConf(conf);
> >      }
> >
> >      /**
> >       * Sample from random points in the input.
> >       * General-purpose sampler. Takes numSamples / maxSplitsSampled
> inputs
> > from
> >       * each split.
> >       */
> >      public static class RandomSampler {
> >        private double freq;
> >        private final int numSamples;
> >        private final int maxSplitsSampled;
> >
> >        /**
> >         * Create a new RandomSampler sampling <em>all</em> splits.
> >         * This will read every split at the client, which is very
> > expensive.
> >         * @param freq Probability with which a key will be chosen.
> >         * @param numSamples Total number of samples to obtain from all
> > selected
> >         *                   splits.
> >         */
> >        public RandomSampler(double freq, int numSamples) {
> >          this(freq, numSamples, Integer.MAX_VALUE);
> >        }
> >
> >        /**
> >         * Create a new RandomSampler.
> >         * @param freq Probability with which a key will be chosen.
> >         * @param numSamples Total number of samples to obtain from all
> > selected
> >         *                   splits.
> >         * @param maxSplitsSampled The maximum number of splits to
> examine.
> >         */
> >        public RandomSampler(double freq, int numSamples, int
> > maxSplitsSampled) {
> >          this.freq = freq;
> >          this.numSamples = numSamples;
> >          this.maxSplitsSampled = maxSplitsSampled;
> >        }
> >
> >        /**
> >         * Randomize the split order, then take the specified number of
> keys
> > from
> >         * each split sampled, where each key is selected with the
> specified
> >         * probability and possibly replaced by a subsequently selected
> key
> > when
> >         * the quota of keys from that split is satisfied.
> >         */
> >        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't
> > preserve type
> >        public ImmutableBytesWritable[]
> > getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job)
> >            throws IOException, InterruptedException {
> >          List<InputSplit> splits = inf.getSplits(job);
> >          ArrayList<ImmutableBytesWritable> samples = new
> > ArrayList<ImmutableBytesWritable>(numSamples);
> >          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
> >
> >          Random r = new Random();
> >          long seed = r.nextLong();
> >          r.setSeed(seed);
> >          LOG.debug("seed: " + seed);
> >          // shuffle splits
> >          for (int i = 0; i < splits.size(); ++i) {
> >            InputSplit tmp = splits.get(i);
> >            int j = r.nextInt(splits.size());
> >            splits.set(i, splits.get(j));
> >            splits.set(j, tmp);
> >          }
> >          // our target rate is in terms of the maximum number of sample
> > splits,
> >          // but we accept the possibility of sampling additional splits
> to
> > hit
> >          // the target sample keyset
> >          for (int i = 0; i < splitsToSample ||
> >                         (i < splits.size() && samples.size() <
> numSamples);
> > ++i) {
> >            RecordReader<ImmutableBytesWritable, Text> reader =
> > inf.createRecordReader(splits.get(i),
> >              new TaskAttemptContext(job.getConfiguration(), new
> > TaskAttemptID()));
> >
> >            reader.initialize(splits.get(i), new
> > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> >
> >            while (reader.nextKeyValue()) {
> >              if (r.nextDouble() <= freq) {
> >                if (samples.size() < numSamples) {
> >                  samples.add(composeKey(reader.getCurrentValue()));
> >                } else {
> >                  // When exceeding the maximum number of samples, replace
> a
> >                  // random element with this one, then adjust the
> frequency
> >                  // to reflect the possibility of existing elements being
> >                  // pushed out
> >                  int ind = r.nextInt(numSamples);
> >                  if (ind != numSamples) {
> >                    samples.set(ind,
> composeKey(reader.getCurrentValue()));
> >                  }
> >                  freq *= (numSamples - 1) / (double) numSamples;
> >                }
> >              }
> >            }
> >            reader.close();
> >          }
> >          return (ImmutableBytesWritable[])samples.toArray(new
> > ImmutableBytesWritable[samples.size()]);
> >        }
> >
> >      }
> >
> >      private static ImmutableBytesWritable composeKey(Text value) {
> >          StringTokenizer itr = new StringTokenizer(value.toString(),
> "\t");
> >            int cnt = 0;
> >            String[] vals = new
> > String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS];
> >            while (itr.hasMoreTokens()) {
> >                vals[cnt] = itr.nextToken();
> >                cnt++;
> >            }
> >
> >            String newKeyStr =
> > AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0,
> >                    AdvancedRawLogUploader.NUM_KEY_FILEDS,
> > AdvancedRawLogUploader.KEY_PARTS_DELIMITER);
> >            info(newKeyStr);
> >            ImmutableBytesWritable newKey = new
> ImmutableBytesWritable(Bytes
> >                    .toBytes(newKeyStr));
> >          return newKey;
> >      }
> >
> >      /**
> >       * Write a partition file for the given job, using the Sampler
> > provided.
> >       * Queries the sampler for a sample keyset, sorts by the output key
> >       * comparator, selects the keys for each rank, and writes to the
> > destination
> >       * returned from {@link TotalOrderPartitioner#getPartitionFile}.
> >       */
> >      @SuppressWarnings("unchecked") // getInputFormat,
> > getOutputKeyComparator
> >      public static void writePartitionFile(Job job, RandomSampler
> sampler)
> >          throws IOException, ClassNotFoundException, InterruptedException
> {
> >        Configuration conf = job.getConfiguration();
> >        final InputFormat inf =
> >            ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
> >        int numPartitions = job.getNumReduceTasks();
> >        ImmutableBytesWritable[] samples = sampler.getSample(inf, job);
> >        LOG.info("Using " + samples.length + " samples");
> >        RawComparator<ImmutableBytesWritable> comparator =
> >          (RawComparator<ImmutableBytesWritable>) job.getSortComparator();
> >        Arrays.sort(samples, comparator);
> >        Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
> >        FileSystem fs = dst.getFileSystem(conf);
> >        if (fs.exists(dst)) {
> >          fs.delete(dst, false);
> >        }
> >        SequenceFile.Writer writer = SequenceFile.createWriter(fs,
> >          conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
> >        NullWritable nullValue = NullWritable.get();
> >        float stepSize = samples.length / (float) numPartitions;
> >        int last = -1;
> >        for(int i = 1; i < numPartitions; ++i) {
> >          int k = Math.round(stepSize * i);
> >          while (last >= k && comparator.compare(samples[last],
> samples[k])
> > == 0) {
> >            ++k;
> >          }
> >          writer.append(samples[k], nullValue);
> >          last = k;
> >        }
> >        writer.close();
> >      }
> >
> >      private static void info(String message) {
> > //          LOG.info(message);
> >          System.out.println(message);
> >      }
> >
> >
> > }
> >
> > 4) and finally the definition of my MR job:
> >
> > MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000,
> > 10);
> > in = in.makeQualified(in.getFileSystem(conf));
> > Path partitionFile = new Path(in.getParent(), "_partitions");
> > // Use TotalOrderPartitioner based on the new API: from
> > http://issues.apache.org/jira/browse/MAPREDUCE-366
> > TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
> > URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
> > DistributedCache.addCacheFile(partitionUri, conf);
> > DistributedCache.createSymlink(conf);
> >
> > Job job = new Job(conf, "Write HFiles at " + out.getName());
> > job.setNumReduceTasks(numReduceTasks);
> > job.setJarByClass(MyHFilesWriter.class);
> > job.setMapperClass(MyMapper.class);
> > job.setMapOutputKeyClass(ImmutableBytesWritable.class);
> > job.setMapOutputValueClass(KeyValue.class);
> > job.setReducerClass(KeyValueSortReducer.class);
> > job.setOutputFormatClass(HFileOutputFormat.class);
> >
> > //job.setSortComparatorClass(MyKeyComparator.class);
> > // if you uncomment the code above you will get:
> >        /*
> > 10/03/24 15:00:43 INFO mapred.JobClient: Task Id :
> > attempt_201003171417_0063_r_000000_0, Status : FAILED
> > java.io.IOException: Added a key not lexically larger than previous
> > key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag,
> > lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId
> >    at
> > org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551)
> >    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513)
> >    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481)
> >    at
> >
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77)
> >    at
> >
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49)
> >    at
> >
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
> >    at
> >
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> >    at
> >
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46)
> >    at
> >
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35)
> >    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> >    at
> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
> >    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
> >    at org.apache.hadoop.mapred.Child.main(Child.java:170)
> >
> > */
> >
> > FileOutputFormat.setOutputPath(job, out);
> > FileInputFormat.addInputPath(job, in);
> > job.setPartitionerClass(TotalOrderPartitioner.class);
> > MySampler.writePartitionFile(job, sampler);
> >
> > System.exit(job.waitForCompletion(true) ? 0 : 1);
> >
> > So if I do not use the MyKeyComparator class (
> > job.setSortComparatorClass(MyKeyComparator.class);) then nothing is
> changed
> > - it works but the values for some keys are not appeared in the table,
> > otherwise (with MyKeyComparator) the error occurs "Added a key not
> lexically
> > larger than previous key".
> >
> > What am I doing wrong? I want to run my MR with more than one reducer and
> > get all data in HBase table after loadtable.rb execution.
> > Thank you have read this far, I hope you didn't get headache :)
> >
> > Ruslan Salyakhov | ruslan@jalent.ru
> >
>

Re: Bulk import, HFiles, Multiple reducers and TotalOrderPartitioner

Posted by Jean-Daniel Cryans <jd...@apache.org>.
Ruslan,

I see you did all the required homework but this mail is really hard
to read. Can you create a jira
(http://issues.apache.org/jira/browse/HBASE) and attach all the code?
This will also make it easier to track.

thx!

J-D

On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <ru...@gmail.com> wrote:
> Hi!
>
> I'm trying to use bulk import that writing HFiles directly into HDFS and
> have a problem with multiple reducers. If I run MR to prepare HFIles with
> more than one reducer then some values for keys are not appeared in the
> table after loadtable.rb script execution. With one reducer everything works
> fine. Let's take a look at details:
>
> Environment:
> - Hadoop 0.20.1
> - HBase release 0.20.3
>
> http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
> - the row id must be formatted as a ImmutableBytesWritable
> - MR job should ensure a total ordering among all keys
>
> http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> - TotalOrderPartitioner that uses the new API
>
> https://issues.apache.org/jira/browse/HBASE-2063
> - patched HFileOutputFormat
>
> Sample data of my keys:
> 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1
> 1.306.CAN.ON.-1.LONDON.1.1.0.1
> 1.306.USA.CO.751.FT COLLINS.1.1.1.0
> 1.306.USA.CO.751.LITTLETON.1.1.1.0
> 4.6.USA.TX.623.MUENSTER.1.1.0.0
> 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0
> 4.68.USA.CT.533.WILLINGTON.1.1.1.0
> 4.68.USA.LA.642.LAFAYETTE.1.1.1.0
> 4.9.USA.CT.501.STAMFORD.1.1.0.0
> 4.9.USA.NJ.504.PRINCETON.1.1.0.1
> 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0
>
> I've put everything together:
>
> 1) Test of TotalOrderPartitioner that checks how it works with my keys.
> note that I've set up my comparator to pass that test
> conf.setClass("mapred.output.key.comparator.class", MyKeyComparator.class,
> Object.class);
>
> import java.io.IOException;
> import java.util.ArrayList;
>
> import junit.framework.TestCase;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.WritableComparable;
> import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
>
> public class TestTotalOrderPartitionerForHFileKeys extends TestCase {
>
>    private static final ImmutableBytesWritable[] splitKeys = new
> ImmutableBytesWritable[] {
>            // -inf
>                    // 0
>            new
> ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")),
>        // 1
>            new
> ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")),
>        // 2
>            new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW
> YORK.1.1.0.0")),         // 3
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")),
>        // 4
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")),
>        // 5
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")),
>    // 6
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),//
> 7
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")),
>    // 8
>            new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY
> CITY.1.1.0.0"))         // 9
>    };
>
>    private static final ArrayList<Check<ImmutableBytesWritable>> testKeys =
> new ArrayList<Check<ImmutableBytesWritable>>();
>    static {
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9));
>    };
>
>    public void testTotalOrderHFileKeyBinarySearch() throws Exception {
>        TotalOrderPartitioner<ImmutableBytesWritable, NullWritable>
> partitioner = new TotalOrderPartitioner<ImmutableBytesWritable,
> NullWritable>();
>        Configuration conf = new Configuration();
>        Path p =
> TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable>
> writePartitionFile(
>                "totalorderbinarysearch", conf, splitKeys);
>        conf.setBoolean("total.order.partitioner.natural.order", false);
>        conf.setClass("mapred.mapoutput.key.class",
> ImmutableBytesWritable.class, Object.class);
>        conf.setClass("mapred.output.key.comparator.class",
> MyKeyComparator.class, Object.class);
>
>        try {
>            partitioner.setConf(conf);
>            NullWritable nw = NullWritable.get();
>            for (Check<ImmutableBytesWritable> chk : testKeys) {
>                log(Bytes.toString(chk.data.get()) + ", chk.part: " +
> chk.part + ", should be: "
>                        + partitioner.getPartition(chk.data, nw,
> splitKeys.length + 1));
>
>                assertEquals(Bytes.toString(chk.data.get()), chk.part,
>                        partitioner.getPartition(chk.data, nw,
> splitKeys.length + 1));
>
>            }
>        } finally {
>            p.getFileSystem(conf).delete(p, true);
>        }
>    }
>
>    public void testInventoryKeyComparator() {
>        InventoryKeyComparator comparator = new InventoryKeyComparator();
>        for (int i = 0; i < splitKeys.length - 2; i++) {
>            // splitKeys should be sorted in ascending order
>            int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]);
>            assertTrue(res1 < 0);
>
>            int res2 = comparator.compare(splitKeys[i].get(), 0,
> splitKeys[i].get().length,
>                    splitKeys[i + 1].get(), 0, splitKeys[i +
> 1].get().length);
>
>            assertTrue(res2 < 0);
>            assertTrue(res1 == res2);
>        }
>    }
>
>    // ----------------------------------------
>    // Copy-Paste from TestTotalOrderPartitoner
>    // http://issues.apache.org/jira/browse/MAPREDUCE-366
>    // ----------------------------------------
>    static class Check<T> {
>        T data;
>        int part;
>
>        Check(T data, int part) {
>            this.data = data;
>            this.part = part;
>        }
>    }
>
>    private static <T extends WritableComparable<?>> Path
> writePartitionFile(String testname,
>            Configuration conf, T[] splits) throws IOException {
>        final FileSystem fs = FileSystem.getLocal(conf);
>        final Path testdir = new Path(System.getProperty("test.build.data",
> "/tmp"))
>                .makeQualified(fs);
>        Path p = new Path(testdir, testname + "/_partition.lst");
>        TotalOrderPartitioner.setPartitionFile(conf, p);
>        conf.setInt("mapred.reduce.tasks", splits.length + 1);
>        SequenceFile.Writer w = null;
>        try {
>            w = SequenceFile.createWriter(fs, conf, p, splits[0].getClass(),
> NullWritable.class,
>                    SequenceFile.CompressionType.NONE);
>            for (int i = 0; i < splits.length; ++i) {
>                w.append(splits[i], NullWritable.get());
>            }
>        } finally {
>            if (null != w)
>                w.close();
>        }
>        return p;
>    }
>
>    private static void log(String message) {
>        System.out.println(message);
>    }
> }
>
> 2) MyKeyComparator
> I've wrote it to pass test above.
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.RawComparator;
>
> public class MyKeyComparator implements
> RawComparator<ImmutableBytesWritable> {
>
>    public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2)
> {
>        return Bytes.compareTo(o1.get(), o2.get());
>    }
>
>    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
> {
>        return Bytes.compareTo(b1, s1, l1, b2, s2, l2);
>    }
> }
>
> 3) MySampler
> this code is based on InputSampler from
> http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> BUT I've put the following string into getSample:
>            reader.initialize(splits.get(i), new
> TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> without that string it doesn't work
>
>
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.Random;
> import java.util.StringTokenizer;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.RawComparator;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.InputSplit;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.RecordReader;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
> import org.apache.hadoop.mapreduce.TaskAttemptID;
> import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> import org.apache.hadoop.util.ReflectionUtils;
>
> /**
>  * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler
>  */
> public class MySampler extends Configured  {
>      private static final Log LOG = LogFactory.getLog(MySampler.class);
>
>      public MySampler(Configuration conf) {
>        setConf(conf);
>      }
>
>      /**
>       * Sample from random points in the input.
>       * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs
> from
>       * each split.
>       */
>      public static class RandomSampler {
>        private double freq;
>        private final int numSamples;
>        private final int maxSplitsSampled;
>
>        /**
>         * Create a new RandomSampler sampling <em>all</em> splits.
>         * This will read every split at the client, which is very
> expensive.
>         * @param freq Probability with which a key will be chosen.
>         * @param numSamples Total number of samples to obtain from all
> selected
>         *                   splits.
>         */
>        public RandomSampler(double freq, int numSamples) {
>          this(freq, numSamples, Integer.MAX_VALUE);
>        }
>
>        /**
>         * Create a new RandomSampler.
>         * @param freq Probability with which a key will be chosen.
>         * @param numSamples Total number of samples to obtain from all
> selected
>         *                   splits.
>         * @param maxSplitsSampled The maximum number of splits to examine.
>         */
>        public RandomSampler(double freq, int numSamples, int
> maxSplitsSampled) {
>          this.freq = freq;
>          this.numSamples = numSamples;
>          this.maxSplitsSampled = maxSplitsSampled;
>        }
>
>        /**
>         * Randomize the split order, then take the specified number of keys
> from
>         * each split sampled, where each key is selected with the specified
>         * probability and possibly replaced by a subsequently selected key
> when
>         * the quota of keys from that split is satisfied.
>         */
>        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't
> preserve type
>        public ImmutableBytesWritable[]
> getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job)
>            throws IOException, InterruptedException {
>          List<InputSplit> splits = inf.getSplits(job);
>          ArrayList<ImmutableBytesWritable> samples = new
> ArrayList<ImmutableBytesWritable>(numSamples);
>          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
>
>          Random r = new Random();
>          long seed = r.nextLong();
>          r.setSeed(seed);
>          LOG.debug("seed: " + seed);
>          // shuffle splits
>          for (int i = 0; i < splits.size(); ++i) {
>            InputSplit tmp = splits.get(i);
>            int j = r.nextInt(splits.size());
>            splits.set(i, splits.get(j));
>            splits.set(j, tmp);
>          }
>          // our target rate is in terms of the maximum number of sample
> splits,
>          // but we accept the possibility of sampling additional splits to
> hit
>          // the target sample keyset
>          for (int i = 0; i < splitsToSample ||
>                         (i < splits.size() && samples.size() < numSamples);
> ++i) {
>            RecordReader<ImmutableBytesWritable, Text> reader =
> inf.createRecordReader(splits.get(i),
>              new TaskAttemptContext(job.getConfiguration(), new
> TaskAttemptID()));
>
>            reader.initialize(splits.get(i), new
> TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
>
>            while (reader.nextKeyValue()) {
>              if (r.nextDouble() <= freq) {
>                if (samples.size() < numSamples) {
>                  samples.add(composeKey(reader.getCurrentValue()));
>                } else {
>                  // When exceeding the maximum number of samples, replace a
>                  // random element with this one, then adjust the frequency
>                  // to reflect the possibility of existing elements being
>                  // pushed out
>                  int ind = r.nextInt(numSamples);
>                  if (ind != numSamples) {
>                    samples.set(ind, composeKey(reader.getCurrentValue()));
>                  }
>                  freq *= (numSamples - 1) / (double) numSamples;
>                }
>              }
>            }
>            reader.close();
>          }
>          return (ImmutableBytesWritable[])samples.toArray(new
> ImmutableBytesWritable[samples.size()]);
>        }
>
>      }
>
>      private static ImmutableBytesWritable composeKey(Text value) {
>          StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
>            int cnt = 0;
>            String[] vals = new
> String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS];
>            while (itr.hasMoreTokens()) {
>                vals[cnt] = itr.nextToken();
>                cnt++;
>            }
>
>            String newKeyStr =
> AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0,
>                    AdvancedRawLogUploader.NUM_KEY_FILEDS,
> AdvancedRawLogUploader.KEY_PARTS_DELIMITER);
>            info(newKeyStr);
>            ImmutableBytesWritable newKey = new ImmutableBytesWritable(Bytes
>                    .toBytes(newKeyStr));
>          return newKey;
>      }
>
>      /**
>       * Write a partition file for the given job, using the Sampler
> provided.
>       * Queries the sampler for a sample keyset, sorts by the output key
>       * comparator, selects the keys for each rank, and writes to the
> destination
>       * returned from {@link TotalOrderPartitioner#getPartitionFile}.
>       */
>      @SuppressWarnings("unchecked") // getInputFormat,
> getOutputKeyComparator
>      public static void writePartitionFile(Job job, RandomSampler sampler)
>          throws IOException, ClassNotFoundException, InterruptedException {
>        Configuration conf = job.getConfiguration();
>        final InputFormat inf =
>            ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
>        int numPartitions = job.getNumReduceTasks();
>        ImmutableBytesWritable[] samples = sampler.getSample(inf, job);
>        LOG.info("Using " + samples.length + " samples");
>        RawComparator<ImmutableBytesWritable> comparator =
>          (RawComparator<ImmutableBytesWritable>) job.getSortComparator();
>        Arrays.sort(samples, comparator);
>        Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
>        FileSystem fs = dst.getFileSystem(conf);
>        if (fs.exists(dst)) {
>          fs.delete(dst, false);
>        }
>        SequenceFile.Writer writer = SequenceFile.createWriter(fs,
>          conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
>        NullWritable nullValue = NullWritable.get();
>        float stepSize = samples.length / (float) numPartitions;
>        int last = -1;
>        for(int i = 1; i < numPartitions; ++i) {
>          int k = Math.round(stepSize * i);
>          while (last >= k && comparator.compare(samples[last], samples[k])
> == 0) {
>            ++k;
>          }
>          writer.append(samples[k], nullValue);
>          last = k;
>        }
>        writer.close();
>      }
>
>      private static void info(String message) {
> //          LOG.info(message);
>          System.out.println(message);
>      }
>
>
> }
>
> 4) and finally the definition of my MR job:
>
> MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000,
> 10);
> in = in.makeQualified(in.getFileSystem(conf));
> Path partitionFile = new Path(in.getParent(), "_partitions");
> // Use TotalOrderPartitioner based on the new API: from
> http://issues.apache.org/jira/browse/MAPREDUCE-366
> TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
> URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
> DistributedCache.addCacheFile(partitionUri, conf);
> DistributedCache.createSymlink(conf);
>
> Job job = new Job(conf, "Write HFiles at " + out.getName());
> job.setNumReduceTasks(numReduceTasks);
> job.setJarByClass(MyHFilesWriter.class);
> job.setMapperClass(MyMapper.class);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class);
> job.setMapOutputValueClass(KeyValue.class);
> job.setReducerClass(KeyValueSortReducer.class);
> job.setOutputFormatClass(HFileOutputFormat.class);
>
> //job.setSortComparatorClass(MyKeyComparator.class);
> // if you uncomment the code above you will get:
>        /*
> 10/03/24 15:00:43 INFO mapred.JobClient: Task Id :
> attempt_201003171417_0063_r_000000_0, Status : FAILED
> java.io.IOException: Added a key not lexically larger than previous
> key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag,
> lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId
>    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551)
>    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513)
>    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481)
>    at
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77)
>    at
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49)
>    at
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
>    at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>    at
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46)
>    at
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35)
>    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>    at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>    at org.apache.hadoop.mapred.Child.main(Child.java:170)
>
> */
>
> FileOutputFormat.setOutputPath(job, out);
> FileInputFormat.addInputPath(job, in);
> job.setPartitionerClass(TotalOrderPartitioner.class);
> MySampler.writePartitionFile(job, sampler);
>
> System.exit(job.waitForCompletion(true) ? 0 : 1);
>
> So if I do not use the MyKeyComparator class (
> job.setSortComparatorClass(MyKeyComparator.class);) then nothing is changed
> - it works but the values for some keys are not appeared in the table,
> otherwise (with MyKeyComparator) the error occurs "Added a key not lexically
> larger than previous key".
>
> What am I doing wrong? I want to run my MR with more than one reducer and
> get all data in HBase table after loadtable.rb execution.
> Thank you have read this far, I hope you didn't get headache :)
>
> Ruslan Salyakhov | ruslan@jalent.ru
>