You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/04/13 00:25:47 UTC

svn commit: r1325555 - in /hbase/trunk/src: docbkx/ main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: larsh
Date: Thu Apr 12 22:25:46 2012
New Revision: 1325555

URL: http://svn.apache.org/viewvc?rev=1325555&view=rev
Log:
HBASE-5604 M/R tool to replay WAL files

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
Modified:
    hbase/trunk/src/docbkx/ops_mgt.xml

Modified: hbase/trunk/src/docbkx/ops_mgt.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docbkx/ops_mgt.xml?rev=1325555&r1=1325554&r2=1325555&view=diff
==============================================================================
--- hbase/trunk/src/docbkx/ops_mgt.xml (original)
+++ hbase/trunk/src/docbkx/ops_mgt.xml Thu Apr 12 22:25:46 2012
@@ -148,7 +148,24 @@
        This page currently exists on the website and will eventually be migrated into the RefGuide.
        </para>
     </section>
-\    <section xml:id="rowcounter">
+    <section xml:id="walplayer">
+       <title>WALPlayer</title>
+       <para>WALPlayer is a utility to replay WAL files into HBase.
+       </para>
+       <para>The WAL can be replayed for a set of tables or all tables, and a timerange can be provided (in milliseconds). The WAL is filtered to this set of tables. The output can optionally be mapped to another set of tables.
+       </para>
+       <para>WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified.
+       </para>
+       <para>Invoke via:
+<programlisting>$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] &lt;wal inputdir&gt; &lt;tables&gt; [&lt;tableMappings>]&gt;
+</programlisting>
+       </para>
+       <para>For example:
+<programlisting>$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir oldTable1,oldTable2 newTable1,newTable2
+</programlisting>
+       </para>
+    </section>
+    <section xml:id="rowcounter">
        <title>RowCounter</title>
        <para>RowCounter is a utility that will count all the rows of a table.  This is a good utility to use
        as a sanity check to ensure that HBase can read all the blocks of a table if there are any concerns of metadata inconsistency.

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java?rev=1325555&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java Thu Apr 12 22:25:46 2012
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Simple {@link InputFormat} for {@link HLog} files.
+ */
+@InterfaceAudience.Public
+public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
+  private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
+
+  public static String START_TIME_KEY = "hlog.start.time";
+  public static String END_TIME_KEY = "hlog.end.time";
+
+  /**
+   * {@link InputSplit} for {@link HLog} files. Each split represent
+   * exactly one log file.
+   */
+  static class HLogSplit extends InputSplit implements Writable {
+    private String logFileName;
+    private long fileSize;
+    private long startTime;
+    private long endTime;
+
+    /** for serialization */
+    public HLogSplit() {}
+
+    /**
+     * Represent an HLogSplit, i.e. a single HLog file.
+     * Start- and EndTime are managed by the split, so that HLog files can be
+     * filtered before WALEdits are passed to the mapper(s).
+     * @param logFileName
+     * @param fileSize
+     * @param startTime
+     * @param endTime
+     */
+    public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
+      this.logFileName = logFileName;
+      this.fileSize = fileSize;
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return fileSize;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      // TODO: Find the data node with the most blocks for this HLog?
+      return new String[] {};
+    }
+
+    public String getLogFileName() {
+      return logFileName;
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      logFileName = in.readUTF();
+      fileSize = in.readLong();
+      startTime = in.readLong();
+      endTime = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(logFileName);
+      out.writeLong(fileSize);
+      out.writeLong(startTime);
+      out.writeLong(endTime);
+    }
+
+    @Override
+    public String toString() {
+      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
+    }
+  }
+
+  /**
+   * {@link RecordReader} for an {@link HLog} file.
+   */
+  static class HLogRecordReader extends RecordReader<HLogKey, WALEdit> {
+    private HLog.Reader reader = null;
+    private HLog.Entry currentEntry = new HLog.Entry();
+    private long startTime;
+    private long endTime;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      HLogSplit hsplit = (HLogSplit)split;
+      Path logFile = new Path(hsplit.getLogFileName());
+      Configuration conf = context.getConfiguration();
+      LOG.info("Opening reader for "+split);
+      try {
+        this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf);
+      } catch (EOFException x) {
+        LOG.info("Ignoring corrupted HLog file: " + logFile
+            + " (This is normal when a RegionServer crashed.)");
+      }
+      this.startTime = hsplit.getStartTime();
+      this.endTime = hsplit.getEndTime();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (reader == null) return false;
+
+      HLog.Entry temp;
+      long i = -1;
+      do {
+        // skip older entries
+        try {
+          temp = reader.next(currentEntry);
+          i++;
+        } catch (EOFException x) {
+          LOG.info("Corrupted entry detected. Ignoring the rest of the file."
+              + " (This is normal when a RegionServer crashed.)");
+          return false;
+        }
+      }
+      while(temp != null && temp.getKey().getWriteTime() < startTime);
+
+      if (temp == null) {
+        if (i > 0) LOG.info("Skipped " + i + " entries.");
+        LOG.info("Reached end of file.");
+        return false;
+      } else if (i > 0) {
+        LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
+      }
+      boolean res = temp.getKey().getWriteTime() <= endTime;
+      if (!res) {
+        LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
+      }
+      return res;
+    }
+
+    @Override
+    public HLogKey getCurrentKey() throws IOException, InterruptedException {
+      return currentEntry.getKey();
+    }
+
+    @Override
+    public WALEdit getCurrentValue() throws IOException, InterruptedException {
+      return currentEntry.getEdit();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // N/A depends on total number of entries, which is unknown
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      LOG.info("Closing reader");
+      if (reader != null) this.reader.close();
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Path inputDir = new Path(conf.get("mapred.input.dir"));
+
+    long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
+    long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
+
+    FileSystem fs = inputDir.getFileSystem(conf);
+    List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
+
+    List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
+    for (FileStatus file : files) {
+      splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
+    }
+    return splits;
+  }
+
+  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
+      throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    LOG.debug("Scanning " + dir.toString() + " for HLog files");
+
+    FileStatus[] files = fs.listStatus(dir);
+    if (files == null) return Collections.emptyList();
+    for (FileStatus file : files) {
+      if (file.isDir()) {
+        // recurse into sub directories
+        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
+      } else {
+        String name = file.getPath().toString();
+        int idx = name.lastIndexOf('.');
+        if (idx > 0) {
+          try {
+            long fileStartTime = Long.parseLong(name.substring(idx+1));
+            if (fileStartTime <= endTime) {
+              LOG.info("Found: " + name);
+              result.add(file);
+            }
+          } catch (NumberFormatException x) {
+            idx = 0;
+          }
+        }
+        if (idx == 0) {
+          LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<HLogKey, WALEdit> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new HLogRecordReader();
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1325555&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Thu Apr 12 22:25:46 2012
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to replay WAL files as a M/R job.
+ * The WAL can be replayed for a set of tables or all tables,
+ * and a timerange can be provided (in milliseconds).
+ * The WAL is filtered to the passed set of tables and  the output
+ * can optionally be mapped to another set of tables.
+ *
+ * WAL replay can also generate HFiles for later bulk importing,
+ * in that case the WAL is replayed for a single table only.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class WALPlayer extends Configured implements Tool {
+  final static String NAME = "WALPlayer";
+  final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
+  final static String HLOG_INPUT_KEY = "hlog.input.dir";
+  final static String TABLES_KEY = "hlog.input.tables";
+  final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
+
+  /**
+   * A mapper that just writes out KeyValues.
+   * This one can be used together with {@link KeyValueSortReducer}
+   */
+  static class HLogKeyValueMapper
+  extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+    private byte[] table;
+
+    @Override
+    public void map(HLogKey key, WALEdit value,
+      Context context)
+    throws IOException {
+      try {
+        // skip all other tables
+        if (Bytes.equals(table, key.getTablename())) {
+          for (KeyValue kv : value.getKeyValues()) {
+            if (HLog.isMetaFamily(kv.getFamily())) continue;
+            context.write(new ImmutableBytesWritable(kv.getRow()), kv);
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // only a single table is supported when HFiles are generated with HFileOutputFormat
+      String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tables == null || tables.length != 1) {
+        // this can only happen when HLogMapper is used directly by a class other than WALPlayer
+        throw new IOException("Exactly one table must be specified for bulk HFile case.");
+      }
+      table = Bytes.toBytes(tables[0]);
+    }
+  }
+
+  /**
+   * A mapper that writes out {@link Mutation} to be directly applied to
+   * a running HBase instance.
+   */
+  static class HLogMapper
+  extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
+    private Map<byte[], byte[]> tables = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+
+    @Override
+    public void map(HLogKey key, WALEdit value,
+      Context context)
+    throws IOException {
+      try {
+        if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
+          byte[] targetTable = tables.isEmpty() ?
+                key.getTablename() :
+                tables.get(key.getTablename());
+          ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable);
+          Put put = null;
+          Delete del = null;
+          KeyValue lastKV = null;
+          for (KeyValue kv : value.getKeyValues()) {
+            // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+            if (HLog.isMetaFamily(kv.getFamily())) continue;
+
+            // A WALEdit may contain multiple operations (HBASE-3584) and/or
+            // multiple rows (HBASE-5229).
+            // Aggregate as much as possible into a single Put/Delete
+            // operation before writing to the context.
+            if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+              // row or type changed, write out aggregate KVs.
+              if (put != null) context.write(tableOut, put);
+              if (del != null) context.write(tableOut, del);
+
+              if (kv.isDelete()) {
+                del = new Delete(kv.getRow());
+              } else {
+                put = new Put(kv.getRow());
+              }
+            }
+            if (kv.isDelete()) {
+              del.addDeleteMarker(kv);
+            } else {
+              put.add(kv);
+            }
+            lastKV = kv;
+          }
+          // write residual KVs
+          if (put != null) context.write(tableOut, put);
+          if (del != null) context.write(tableOut, del);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
+      String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+        // this can only happen when HLogMapper is used directly by a class other than WALPlayer
+        throw new IOException("No tables or incorrect table mapping specified.");
+      }
+      int i = 0;
+      for (String table : tablesToUse) {
+        tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++]));
+      }
+    }
+  }
+
+  /**
+   * @param conf The {@link Configuration} to use.
+   */
+  public WALPlayer(Configuration conf) {
+    super(conf);
+  }
+
+  void setupTime(Configuration conf, String option) throws IOException {
+    String val = conf.get(option);
+    if (val == null) return;
+    long ms;
+    try {
+      // first try to parse in user friendly form
+      ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
+    } catch (ParseException pe) {
+      try {
+        // then see if just a number of ms's was specified
+        ms = Long.parseLong(val);
+      } catch (NumberFormatException nfe) {
+        throw new IOException(option
+            + " must be specified either in the form 2001-02-20T16:35:06.99 "
+            + "or as number of milliseconds");
+      }
+    }
+    conf.setLong(option, ms);
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args)
+  throws IOException {
+    Configuration conf = getConf();
+    setupTime(conf, HLogInputFormat.START_TIME_KEY);
+    setupTime(conf, HLogInputFormat.END_TIME_KEY);
+    Path inputDir = new Path(args[0]);
+    String[] tables = args[1].split(",");
+    String[] tableMap;
+    if (args.length > 2) {
+      tableMap = args[2].split(",");
+      if (tableMap.length != tables.length) {
+        throw new IOException("The same number of tables and mapping must be provided.");
+      }
+    } else {
+      // if not mapping is specified map each table to itself
+      tableMap = tables;
+    }
+    conf.setStrings(TABLES_KEY, tables);
+    conf.setStrings(TABLE_MAP_KEY, tableMap);
+    Job job = new Job(conf, NAME + "_" + inputDir);
+    job.setJarByClass(WALPlayer.class);
+    FileInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(HLogInputFormat.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      // the bulk HFile case
+      if (tables.length != 1) {
+        throw new IOException("Exactly one table must be specified for the bulk export option");
+      }
+      HTable table = new HTable(conf, tables[0]);
+      job.setMapperClass(HLogKeyValueMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      HFileOutputFormat.configureIncrementalLoad(job, table);
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+          com.google.common.base.Preconditions.class);
+    } else {
+      // output to live cluster
+      job.setMapperClass(HLogMapper.class);
+      job.setOutputFormatClass(MultiTableOutputFormat.class);
+      TableMapReduceUtil.addDependencyJars(job);
+      // No reducers.
+      job.setNumReduceTasks(0);
+    }
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
+    System.err.println("Read all WAL entries for <tables>.");
+    System.err.println("If no tables (\"\") are specific, all tables are imported.");
+    System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)");
+    System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
+    System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
+    System.err.println("<tableMapping> is a command separated list of targettables.");
+    System.err.println("If specified, each table in <tables> must have a mapping.\n");
+    System.err.println("By default " + NAME + " will load data directly into HBase.");
+    System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("  (Only one table can be specified, and no mapping is allowed!)");
+    System.err.println("Other options: (specify time range to WAL edit to consider)");
+    System.err.println("  -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
+    System.err.println("  -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapred.map.tasks.speculative.execution=false\n"
+        + "  -Dmapred.reduce.tasks.speculative.execution=false");
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+    if (otherArgs.length < 2) {
+      usage("Wrong number of arguments: " + otherArgs.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(otherArgs);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java?rev=1325555&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java Thu Apr 12 22:25:46 2012
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * JUnit tests for the HLogRecordReader
+ */
+@Category(MediumTests.class)
+public class TestHLogRecordReader {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path hbaseDir;
+  private static final byte [] tableName = Bytes.toBytes(getName());
+  private static final byte [] rowName = tableName;
+  private static final HRegionInfo info = new HRegionInfo(tableName,
+      Bytes.toBytes(""), Bytes.toBytes(""), false);
+  private static final byte [] family = Bytes.toBytes("column");
+  private static final byte [] value = Bytes.toBytes("value");
+  private static HTableDescriptor htd;
+  private static Path logDir;
+  private static Path oldLogDir;
+
+  private static String getName() {
+    return "TestHLogRecordReader";
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(hbaseDir);
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+
+  }
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.replication", 1);
+    TEST_UTIL.startMiniDFSCluster(1);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+    hbaseDir = TEST_UTIL.createRootDir();
+    logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+    oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test partial reads from the log based on passed time range
+   * @throws Exception
+   */
+  @Test
+  public void testPartialRead() throws Exception {
+    HLog log = new HLog(fs, logDir, oldLogDir, conf);
+    long ts = System.currentTimeMillis();
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+        ts, value));
+    log.append(info, tableName, edit,
+      ts, htd);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+        ts+1, value));
+    log.append(info, tableName, edit,
+        ts+1, htd);
+    log.rollWriter();
+
+    Thread.sleep(1);
+    long ts1 = System.currentTimeMillis();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"),
+        ts1+1, value));
+    log.append(info, tableName, edit,
+        ts1+1, htd);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"),
+        ts1+2, value));
+    log.append(info, tableName, edit,
+        ts1+2, htd);
+    log.close();
+
+    HLogInputFormat input = new HLogInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapred.input.dir", logDir.toString());
+    jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts);
+
+    // only 1st file is considered, and only its 1st entry is used
+    List<InputSplit> splits = input.getSplits(new JobContext(jobConf, new JobID()));
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1);
+    jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1);
+    splits = input.getSplits(new JobContext(jobConf, new JobID()));
+    // both files need to be considered
+    assertEquals(2, splits.size());
+    // only the 2nd entry from the 1st file is used
+    testSplit(splits.get(0), Bytes.toBytes("2"));
+    // only the 1nd entry from the 2nd file is used
+    testSplit(splits.get(1), Bytes.toBytes("3"));
+  }
+
+  /**
+   * Test basic functionality
+   * @throws Exception
+   */
+  @Test
+  public void testHLogRecordReader() throws Exception {
+    HLog log = new HLog(fs, logDir, oldLogDir, conf);
+    byte [] value = Bytes.toBytes("value");
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+        System.currentTimeMillis(), value));
+    log.append(info, tableName, edit,
+      System.currentTimeMillis(), htd);
+
+    Thread.sleep(1); // make sure 2nd log gets a later timestamp
+    long secondTs = System.currentTimeMillis();
+    log.rollWriter();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+        System.currentTimeMillis(), value));
+    log.append(info, tableName, edit,
+      System.currentTimeMillis(), htd);
+    log.close();
+    long thirdTs = System.currentTimeMillis();
+
+    // should have 2 log files now
+    HLogInputFormat input = new HLogInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapred.input.dir", logDir.toString());
+
+    // make sure both logs are found
+    List<InputSplit> splits = input.getSplits(new JobContext(jobConf, new JobID()));
+    assertEquals(2, splits.size());
+
+    // should return exactly one KV
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+    // same for the 2nd split
+    testSplit(splits.get(1), Bytes.toBytes("2"));
+
+    // now test basic time ranges:
+
+    // set an endtime, the 2nd log file can be ignored completely.
+    jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1);
+    splits = input.getSplits(new JobContext(jobConf, new JobID()));
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    // now set a start time
+    jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE);
+    jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs);
+    splits = input.getSplits(new JobContext(jobConf, new JobID()));
+    // both logs need to be considered
+    assertEquals(2, splits.size());
+    // but both readers skip all edits
+    testSplit(splits.get(0));
+    testSplit(splits.get(1));
+  }
+
+  /**
+   * Create a new reader from the split, and match the edits against the passed columns.
+   */
+  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
+    HLogRecordReader reader = new HLogRecordReader();
+    reader.initialize(split, new TaskAttemptContext(conf, new TaskAttemptID()));
+
+    for (byte[] column : columns) {
+      assertTrue(reader.nextKeyValue());
+      assertTrue(Bytes
+          .equals(column, reader.getCurrentValue().getKeyValues().get(0).getQualifier()));
+    }
+    assertFalse(reader.nextKeyValue());
+    reader.close();
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java?rev=1325555&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java Thu Apr 12 22:25:46 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Basic test for the WALPlayer M/R tool
+ */
+public class TestWALPlayer {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static MiniHBaseCluster cluster;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    cluster = TEST_UTIL.startMiniCluster();
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Simple end-to-end test
+   * @throws Exception
+   */
+  @Test
+  public void testWALPlayer() throws Exception {
+    final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
+    final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
+    final byte[] FAMILY = Bytes.toBytes("family");
+    final byte[] COLUMN1 = Bytes.toBytes("c1");
+    final byte[] COLUMN2 = Bytes.toBytes("c2");
+    final byte[] ROW = Bytes.toBytes("row");
+    HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
+    HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
+
+    // put a row into the first table
+    Put p = new Put(ROW);
+    p.add(FAMILY, COLUMN1, COLUMN1);
+    p.add(FAMILY, COLUMN2, COLUMN2);
+    t1.put(p);
+    // delete one column
+    Delete d = new Delete(ROW);
+    d.deleteColumns(FAMILY, COLUMN1);
+    t1.delete(d);
+
+    // replay the WAL, map table 1 to table 2
+    HLog log = cluster.getRegionServer(0).getWAL();
+    log.rollWriter();
+    String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
+        .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
+
+    WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration());
+    assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
+        Bytes.toString(TABLENAME2) }));
+
+    // verify the WAL was player into table 2
+    Get g = new Get(ROW);
+    Result r = t2.get(g);
+    assertEquals(1, r.size());
+    assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier()));
+  }
+
+  /**
+   * Simple test for data parsing
+   * @throws Exception
+   */
+  @Test
+  public void testTimeFormat() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration());
+    player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
+    // make sure if nothing is specified nothing is set
+    assertNull(conf.get(HLogInputFormat.END_TIME_KEY));
+    // test a textual data (including ms)
+    conf.set(HLogInputFormat.END_TIME_KEY, "2012-4-10T14:21:01.01");
+    player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
+    assertEquals(1334092861001L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0));
+    // test with mss as a long
+    conf.set(HLogInputFormat.END_TIME_KEY, "1334092861010");
+    player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
+    assertEquals(1334092861010L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0));
+  }
+}