You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:13 UTC

[13/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
deleted file mode 100644
index bf11473..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
-import org.apache.hadoop.hbase.client.IsolationLevel;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.Writable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class TableSnapshotInputFormatImpl {
-  // TODO: Snapshots files are owned in fs by the hbase user. There is no
-  // easy way to delegate access.
-
-  public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
-
-  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
-  // key for specifying the root dir of the restored snapshot
-  protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
-
-  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
-  private static final String LOCALITY_CUTOFF_MULTIPLIER =
-    "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
-  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
-
-  /**
-   * Implementation class for InputSplit logic common between mapred and mapreduce.
-   */
-  public static class InputSplit implements Writable {
-
-    private TableDescriptor htd;
-    private HRegionInfo regionInfo;
-    private String[] locations;
-    private String scan;
-    private String restoreDir;
-
-    // constructor for mapreduce framework / Writable
-    public InputSplit() {}
-
-    public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
-        Scan scan, Path restoreDir) {
-      this.htd = htd;
-      this.regionInfo = regionInfo;
-      if (locations == null || locations.isEmpty()) {
-        this.locations = new String[0];
-      } else {
-        this.locations = locations.toArray(new String[locations.size()]);
-      }
-      try {
-        this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
-      } catch (IOException e) {
-        LOG.warn("Failed to convert Scan to String", e);
-      }
-
-      this.restoreDir = restoreDir.toString();
-    }
-
-    public TableDescriptor getHtd() {
-      return htd;
-    }
-
-    public String getScan() {
-      return scan;
-    }
-
-    public String getRestoreDir() {
-      return restoreDir;
-    }
-
-    public long getLength() {
-      //TODO: We can obtain the file sizes of the snapshot here.
-      return 0;
-    }
-
-    public String[] getLocations() {
-      return locations;
-    }
-
-    public TableDescriptor getTableDescriptor() {
-      return htd;
-    }
-
-    public HRegionInfo getRegionInfo() {
-      return regionInfo;
-    }
-
-    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
-    // doing this wrapping with Writables.
-    @Override
-    public void write(DataOutput out) throws IOException {
-      TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
-          .setTable(ProtobufUtil.toTableSchema(htd))
-          .setRegion(HRegionInfo.convert(regionInfo));
-
-      for (String location : locations) {
-        builder.addLocations(location);
-      }
-
-      TableSnapshotRegionSplit split = builder.build();
-
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      split.writeTo(baos);
-      baos.close();
-      byte[] buf = baos.toByteArray();
-      out.writeInt(buf.length);
-      out.write(buf);
-
-      Bytes.writeByteArray(out, Bytes.toBytes(scan));
-      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
-
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int len = in.readInt();
-      byte[] buf = new byte[len];
-      in.readFully(buf);
-      TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
-      this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
-      this.regionInfo = HRegionInfo.convert(split.getRegion());
-      List<String> locationsList = split.getLocationsList();
-      this.locations = locationsList.toArray(new String[locationsList.size()]);
-
-      this.scan = Bytes.toString(Bytes.readByteArray(in));
-      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
-    }
-  }
-
-  /**
-   * Implementation class for RecordReader logic common between mapred and mapreduce.
-   */
-  public static class RecordReader {
-    private InputSplit split;
-    private Scan scan;
-    private Result result = null;
-    private ImmutableBytesWritable row = null;
-    private ClientSideRegionScanner scanner;
-
-    public ClientSideRegionScanner getScanner() {
-      return scanner;
-    }
-
-    public void initialize(InputSplit split, Configuration conf) throws IOException {
-      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
-      this.split = split;
-      TableDescriptor htd = split.htd;
-      HRegionInfo hri = this.split.getRegionInfo();
-      FileSystem fs = FSUtils.getCurrentFileSystem(conf);
-
-
-      // region is immutable, this should be fine,
-      // otherwise we have to set the thread read point
-      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
-      // disable caching of data blocks
-      scan.setCacheBlocks(false);
-
-      scanner =
-          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
-    }
-
-    public boolean nextKeyValue() throws IOException {
-      result = scanner.next();
-      if (result == null) {
-        //we are done
-        return false;
-      }
-
-      if (this.row == null) {
-        this.row = new ImmutableBytesWritable();
-      }
-      this.row.set(result.getRow());
-      return true;
-    }
-
-    public ImmutableBytesWritable getCurrentKey() {
-      return row;
-    }
-
-    public Result getCurrentValue() {
-      return result;
-    }
-
-    public long getPos() {
-      return 0;
-    }
-
-    public float getProgress() {
-      return 0; // TODO: use total bytes to estimate
-    }
-
-    public void close() {
-      if (this.scanner != null) {
-        this.scanner.close();
-      }
-    }
-  }
-
-  public static List<InputSplit> getSplits(Configuration conf) throws IOException {
-    String snapshotName = getSnapshotName(conf);
-
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
-
-    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
-
-    // TODO: mapred does not support scan as input API. Work around for now.
-    Scan scan = extractScanFromConf(conf);
-    // the temp dir where the snapshot is restored
-    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
-
-    return getSplits(scan, manifest, regionInfos, restoreDir, conf);
-  }
-
-  public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
-    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
-    if (regionManifests == null) {
-      throw new IllegalArgumentException("Snapshot seems empty");
-    }
-
-    List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
-
-    for (SnapshotRegionManifest regionManifest : regionManifests) {
-      HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
-      if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
-        continue;
-      }
-      regionInfos.add(hri);
-    }
-    return regionInfos;
-  }
-
-  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
-      Path rootDir, FileSystem fs) throws IOException {
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
-  }
-
-  public static Scan extractScanFromConf(Configuration conf) throws IOException {
-    Scan scan = null;
-    if (conf.get(TableInputFormat.SCAN) != null) {
-      scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
-    } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
-      String[] columns =
-        conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
-      scan = new Scan();
-      for (String col : columns) {
-        scan.addFamily(Bytes.toBytes(col));
-      }
-    } else {
-      throw new IllegalArgumentException("Unable to create scan");
-    }
-    return scan;
-  }
-
-  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
-      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
-    // load table descriptor
-    TableDescriptor htd = manifest.getTableDescriptor();
-
-    Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
-
-    List<InputSplit> splits = new ArrayList<>();
-    for (HRegionInfo hri : regionManifests) {
-      // load region descriptor
-
-      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
-          hri.getEndKey())) {
-        // compute HDFS locations from snapshot files (which will get the locations for
-        // referred hfiles)
-        List<String> hosts = getBestLocations(conf,
-            HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
-        int len = Math.min(3, hosts.size());
-        hosts = hosts.subList(0, len);
-        splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
-      }
-    }
-
-    return splits;
-
-  }
-
-  /**
-   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
-   * weights into account, thus will treat every location passed from the input split as equal. We
-   * do not want to blindly pass all the locations, since we are creating one split per region, and
-   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
-   * is used. On the expected stable case, only one location will contain most of the blocks as
-   * local.
-   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
-   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
-   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
-   * host with the best locality.
-   */
-  public static List<String> getBestLocations(
-      Configuration conf, HDFSBlocksDistribution blockDistribution) {
-    List<String> locations = new ArrayList<>(3);
-
-    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
-
-    if (hostAndWeights.length == 0) {
-      return locations;
-    }
-
-    HostAndWeight topHost = hostAndWeights[0];
-    locations.add(topHost.getHost());
-
-    // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
-    double cutoffMultiplier
-      = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
-
-    double filterWeight = topHost.getWeight() * cutoffMultiplier;
-
-    for (int i = 1; i < hostAndWeights.length; i++) {
-      if (hostAndWeights[i].getWeight() >= filterWeight) {
-        locations.add(hostAndWeights[i].getHost());
-      } else {
-        break;
-      }
-    }
-
-    return locations;
-  }
-
-  private static String getSnapshotName(Configuration conf) {
-    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
-    if (snapshotName == null) {
-      throw new IllegalArgumentException("Snapshot name must be provided");
-    }
-    return snapshotName;
-  }
-
-  /**
-   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
-   * @param conf the job to configuration
-   * @param snapshotName the name of the snapshot to read from
-   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restoreDir can be deleted.
-   * @throws IOException if an error occurs
-   */
-  public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
-      throws IOException {
-    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
-
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
-
-    // TODO: restore from record readers to parallelize.
-    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
-
-    conf.set(RESTORE_DIR_KEY, restoreDir.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
deleted file mode 100644
index 13c7c67..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * A table split corresponds to a key range (low, high) and an optional scanner.
- * All references to row below refer to the key of the row.
- */
-@InterfaceAudience.Public
-public class TableSplit extends InputSplit
-implements Writable, Comparable<TableSplit> {
-  /** @deprecated LOG variable would be made private. fix in hbase 3.0 */
-  @Deprecated
-  public static final Log LOG = LogFactory.getLog(TableSplit.class);
-
-  // should be < 0 (@see #readFields(DataInput))
-  // version 1 supports Scan data member
-  enum Version {
-    UNVERSIONED(0),
-    // Initial number we put on TableSplit when we introduced versioning.
-    INITIAL(-1),
-    // Added an encoded region name field for easier identification of split -> region
-    WITH_ENCODED_REGION_NAME(-2);
-
-    final int code;
-    static final Version[] byCode;
-    static {
-      byCode = Version.values();
-      for (int i = 0; i < byCode.length; i++) {
-        if (byCode[i].code != -1 * i) {
-          throw new AssertionError("Values in this enum should be descending by one");
-        }
-      }
-    }
-
-    Version(int code) {
-      this.code = code;
-    }
-
-    boolean atLeast(Version other) {
-      return code <= other.code;
-    }
-
-    static Version fromCode(int code) {
-      return byCode[code * -1];
-    }
-  }
-
-  private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
-  private TableName tableName;
-  private byte [] startRow;
-  private byte [] endRow;
-  private String regionLocation;
-  private String encodedRegionName = "";
-  private String scan = ""; // stores the serialized form of the Scan
-  private long length; // Contains estimation of region size in bytes
-
-  /** Default constructor. */
-  public TableSplit() {
-    this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY,
-      HConstants.EMPTY_BYTE_ARRAY, "");
-  }
-
-  /**
-   * Creates a new instance while assigning all variables.
-   * Length of region is set to 0
-   * Encoded name of the region is set to blank
-   *
-   * @param tableName  The name of the current table.
-   * @param scan The scan associated with this split.
-   * @param startRow  The start row of the split.
-   * @param endRow  The end row of the split.
-   * @param location  The location of the region.
-   */
-  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
-                    final String location) {
-    this(tableName, scan, startRow, endRow, location, 0L);
-  }
-
-  /**
-   * Creates a new instance while assigning all variables.
-   * Encoded name of region is set to blank
-   *
-   * @param tableName  The name of the current table.
-   * @param scan The scan associated with this split.
-   * @param startRow  The start row of the split.
-   * @param endRow  The end row of the split.
-   * @param location  The location of the region.
-   */
-  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
-      final String location, long length) {
-    this(tableName, scan, startRow, endRow, location, "", length);
-  }
-
-  /**
-   * Creates a new instance while assigning all variables.
-   *
-   * @param tableName  The name of the current table.
-   * @param scan The scan associated with this split.
-   * @param startRow  The start row of the split.
-   * @param endRow  The end row of the split.
-   * @param encodedRegionName The region ID.
-   * @param location  The location of the region.
-   */
-  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
-      final String location, final String encodedRegionName, long length) {
-    this.tableName = tableName;
-    try {
-      this.scan =
-        (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
-    } catch (IOException e) {
-      LOG.warn("Failed to convert Scan to String", e);
-    }
-    this.startRow = startRow;
-    this.endRow = endRow;
-    this.regionLocation = location;
-    this.encodedRegionName = encodedRegionName;
-    this.length = length;
-  }
-
-  /**
-   * Creates a new instance without a scanner.
-   * Length of region is set to 0
-   *
-   * @param tableName The name of the current table.
-   * @param startRow The start row of the split.
-   * @param endRow The end row of the split.
-   * @param location The location of the region.
-   */
-  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
-      final String location) {
-    this(tableName, null, startRow, endRow, location);
-  }
-
-  /**
-   * Creates a new instance without a scanner.
-   *
-   * @param tableName The name of the current table.
-   * @param startRow The start row of the split.
-   * @param endRow The end row of the split.
-   * @param location The location of the region.
-   * @param length Size of region in bytes
-   */
-  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
-                    final String location, long length) {
-    this(tableName, null, startRow, endRow, location, length);
-  }
-
-  /**
-   * Returns a Scan object from the stored string representation.
-   *
-   * @return Returns a Scan object based on the stored scanner.
-   * @throws IOException
-   */
-  public Scan getScan() throws IOException {
-    return TableMapReduceUtil.convertStringToScan(this.scan);
-  }
-
-  /**
-   * Returns the table name converted to a byte array.
-   * @see #getTable()
-   * @return The table name.
-   */
-  public byte [] getTableName() {
-    return tableName.getName();
-  }
-
-  /**
-   * Returns the table name.
-   *
-   * @return The table name.
-   */
-  public TableName getTable() {
-    // It is ugly that usually to get a TableName, the method is called getTableName.  We can't do
-    // that in here though because there was an existing getTableName in place already since
-    // deprecated.
-    return tableName;
-  }
-
-  /**
-   * Returns the start row.
-   *
-   * @return The start row.
-   */
-  public byte [] getStartRow() {
-    return startRow;
-  }
-
-  /**
-   * Returns the end row.
-   *
-   * @return The end row.
-   */
-  public byte [] getEndRow() {
-    return endRow;
-  }
-
-  /**
-   * Returns the region location.
-   *
-   * @return The region's location.
-   */
-  public String getRegionLocation() {
-    return regionLocation;
-  }
-
-  /**
-   * Returns the region's location as an array.
-   *
-   * @return The array containing the region location.
-   * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
-   */
-  @Override
-  public String[] getLocations() {
-    return new String[] {regionLocation};
-  }
-
-  /**
-   * Returns the region's encoded name.
-   *
-   * @return The region's encoded name.
-   */
-  public String getEncodedRegionName() {
-    return encodedRegionName;
-  }
-
-  /**
-   * Returns the length of the split.
-   *
-   * @return The length of the split.
-   * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
-   */
-  @Override
-  public long getLength() {
-    return length;
-  }
-
-  /**
-   * Reads the values of each field.
-   *
-   * @param in  The input to read from.
-   * @throws IOException When reading the input fails.
-   */
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    Version version = Version.UNVERSIONED;
-    // TableSplit was not versioned in the beginning.
-    // In order to introduce it now, we make use of the fact
-    // that tableName was written with Bytes.writeByteArray,
-    // which encodes the array length as a vint which is >= 0.
-    // Hence if the vint is >= 0 we have an old version and the vint
-    // encodes the length of tableName.
-    // If < 0 we just read the version and the next vint is the length.
-    // @see Bytes#readByteArray(DataInput)
-    int len = WritableUtils.readVInt(in);
-    if (len < 0) {
-      // what we just read was the version
-      version = Version.fromCode(len);
-      len = WritableUtils.readVInt(in);
-    }
-    byte[] tableNameBytes = new byte[len];
-    in.readFully(tableNameBytes);
-    tableName = TableName.valueOf(tableNameBytes);
-    startRow = Bytes.readByteArray(in);
-    endRow = Bytes.readByteArray(in);
-    regionLocation = Bytes.toString(Bytes.readByteArray(in));
-    if (version.atLeast(Version.INITIAL)) {
-      scan = Bytes.toString(Bytes.readByteArray(in));
-    }
-    length = WritableUtils.readVLong(in);
-    if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
-      encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
-    }
-  }
-
-  /**
-   * Writes the field values to the output.
-   *
-   * @param out  The output to write to.
-   * @throws IOException When writing the values to the output fails.
-   */
-  @Override
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, VERSION.code);
-    Bytes.writeByteArray(out, tableName.getName());
-    Bytes.writeByteArray(out, startRow);
-    Bytes.writeByteArray(out, endRow);
-    Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
-    Bytes.writeByteArray(out, Bytes.toBytes(scan));
-    WritableUtils.writeVLong(out, length);
-    Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
-  }
-
-  /**
-   * Returns the details about this instance as a string.
-   *
-   * @return The values of this instance as a string.
-   * @see java.lang.Object#toString()
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("HBase table split(");
-    sb.append("table name: ").append(tableName);
-    // null scan input is represented by ""
-    String printScan = "";
-    if (!scan.equals("")) {
-      try {
-        // get the real scan here in toString, not the Base64 string
-        printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
-      }
-      catch (IOException e) {
-        printScan = "";
-      }
-    }
-    sb.append(", scan: ").append(printScan);
-    sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
-    sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
-    sb.append(", region location: ").append(regionLocation);
-    sb.append(", encoded region name: ").append(encodedRegionName);
-    sb.append(")");
-    return sb.toString();
-  }
-
-  /**
-   * Compares this split against the given one.
-   *
-   * @param split  The split to compare to.
-   * @return The result of the comparison.
-   * @see java.lang.Comparable#compareTo(java.lang.Object)
-   */
-  @Override
-  public int compareTo(TableSplit split) {
-    // If The table name of the two splits is the same then compare start row
-    // otherwise compare based on table names
-    int tableNameComparison =
-        getTable().compareTo(split.getTable());
-    return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
-        getStartRow(), split.getStartRow());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || !(o instanceof TableSplit)) {
-      return false;
-    }
-    return tableName.equals(((TableSplit)o).tableName) &&
-      Bytes.equals(startRow, ((TableSplit)o).startRow) &&
-      Bytes.equals(endRow, ((TableSplit)o).endRow) &&
-      regionLocation.equals(((TableSplit)o).regionLocation);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = tableName != null ? tableName.hashCode() : 0;
-    result = 31 * result + (scan != null ? scan.hashCode() : 0);
-    result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
-    result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
-    result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
-    result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
deleted file mode 100644
index 84324e2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
- * @see HFileOutputFormat2
- * @see KeyValueSortReducer
- * @see PutSortReducer
- */
-@InterfaceAudience.Public
-public class TextSortReducer extends
-    Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
-  
-  /** Timestamp for all inserted rows */
-  private long ts;
-
-  /** Column seperator */
-  private String separator;
-
-  /** Should skip bad lines */
-  private boolean skipBadLines;
-  
-  private Counter badLineCount;
-
-  private ImportTsv.TsvParser parser;
-
-  /** Cell visibility expr **/
-  private String cellVisibilityExpr;
-
-  /** Cell TTL */
-  private long ttl;
-
-  private CellCreator kvCreator;
-
-  public long getTs() {
-    return ts;
-  }
-
-  public boolean getSkipBadLines() {
-    return skipBadLines;
-  }
-
-  public Counter getBadLineCount() {
-    return badLineCount;
-  }
-
-  public void incrementBadLineCount(int count) {
-    this.badLineCount.increment(count);
-  }
-
-  /**
-   * Handles initializing this class with objects specific to it (i.e., the parser).
-   * Common initialization that might be leveraged by a subsclass is done in
-   * <code>doSetup</code>. Hence a subclass may choose to override this method
-   * and call <code>doSetup</code> as well before handling it's own custom params.
-   *
-   * @param context
-   */
-  @Override
-  protected void setup(Context context) {
-    Configuration conf = context.getConfiguration();
-    doSetup(context, conf);
-
-    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
-    if (parser.getRowKeyColumnIndex() == -1) {
-      throw new RuntimeException("No row key column specified");
-    }
-    this.kvCreator = new CellCreator(conf);
-  }
-
-  /**
-   * Handles common parameter initialization that a subclass might want to leverage.
-   * @param context
-   * @param conf
-   */
-  protected void doSetup(Context context, Configuration conf) {
-    // If a custom separator has been used,
-    // decode it back from Base64 encoding.
-    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
-    if (separator == null) {
-      separator = ImportTsv.DEFAULT_SEPARATOR;
-    } else {
-      separator = new String(Base64.decode(separator));
-    }
-
-    // Should never get 0 as we are setting this to a valid value in job configuration.
-    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
-
-    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
-    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
-  }
-  
-  @Override
-  protected void reduce(
-      ImmutableBytesWritable rowKey,
-      java.lang.Iterable<Text> lines,
-      Reducer<ImmutableBytesWritable, Text,
-              ImmutableBytesWritable, KeyValue>.Context context)
-      throws java.io.IOException, InterruptedException
-  {
-    // although reduce() is called per-row, handle pathological case
-    long threshold = context.getConfiguration().getLong(
-        "reducer.row.threshold", 1L * (1<<30));
-    Iterator<Text> iter = lines.iterator();
-    while (iter.hasNext()) {
-      Set<KeyValue> kvs = new TreeSet<>(CellComparator.COMPARATOR);
-      long curSize = 0;
-      // stop at the end or the RAM threshold
-      while (iter.hasNext() && curSize < threshold) {
-        Text line = iter.next();
-        byte[] lineBytes = line.getBytes();
-        try {
-          ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
-          // Retrieve timestamp if exists
-          ts = parsed.getTimestamp(ts);
-          cellVisibilityExpr = parsed.getCellVisibility();
-          ttl = parsed.getCellTTL();
-          
-          // create tags for the parsed line
-          List<Tag> tags = new ArrayList<>();
-          if (cellVisibilityExpr != null) {
-            tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
-              cellVisibilityExpr));
-          }
-          // Add TTL directly to the KV so we can vary them when packing more than one KV
-          // into puts
-          if (ttl > 0) {
-            tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
-          }
-          for (int i = 0; i < parsed.getColumnCount(); i++) {
-            if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
-                || i == parser.getCellTTLColumnIndex()) {
-              continue;
-            }
-            // Creating the KV which needs to be directly written to HFiles. Using the Facade
-            // KVCreator for creation of kvs.
-            Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
-                parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
-                parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
-                parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            kvs.add(kv);
-            curSize += kv.heapSize();
-          }
-        } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
-            | InvalidLabelException badLine) {
-          if (skipBadLines) {
-            System.err.println("Bad line." + badLine.getMessage());
-            incrementBadLineCount(1);
-            continue;
-          }
-          throw new IOException(badLine);
-        }
-      }
-      context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
-          + "(" + StringUtils.humanReadableInt(curSize) + ")");
-      int index = 0;
-      for (KeyValue kv : kvs) {
-        context.write(rowKey, kv);
-        if (++index > 0 && index % 100 == 0)
-          context.setStatus("Wrote " + index + " key values.");
-      }
-
-      // if we have more entries to process
-      if (iter.hasNext()) {
-        // force flush because we cannot guarantee intra-row sorted order
-        context.write(null, null);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
deleted file mode 100644
index a9d8e03..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
-import org.apache.hadoop.hbase.security.visibility.CellVisibility;
-import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Write table content out to files in hdfs.
- */
-@InterfaceAudience.Public
-public class TsvImporterMapper
-extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
-{
-
-  /** Timestamp for all inserted rows */
-  protected long ts;
-
-  /** Column seperator */
-  private String separator;
-
-  /** Should skip bad lines */
-  private boolean skipBadLines;
-  /** Should skip empty columns*/
-  private boolean skipEmptyColumns;
-  private Counter badLineCount;
-  private boolean logBadLines;
-
-  protected ImportTsv.TsvParser parser;
-
-  protected Configuration conf;
-
-  protected String cellVisibilityExpr;
-
-  protected long ttl;
-
-  protected CellCreator kvCreator;
-
-  private String hfileOutPath;
-
-  /** List of cell tags */
-  private List<Tag> tags;
-  
-  public long getTs() {
-    return ts;
-  }
-
-  public boolean getSkipBadLines() {
-    return skipBadLines;
-  }
-
-  public Counter getBadLineCount() {
-    return badLineCount;
-  }
-
-  public void incrementBadLineCount(int count) {
-    this.badLineCount.increment(count);
-  }
-
-  /**
-   * Handles initializing this class with objects specific to it (i.e., the parser).
-   * Common initialization that might be leveraged by a subsclass is done in
-   * <code>doSetup</code>. Hence a subclass may choose to override this method
-   * and call <code>doSetup</code> as well before handling it's own custom params.
-   *
-   * @param context
-   */
-  @Override
-  protected void setup(Context context) {
-    doSetup(context);
-
-    conf = context.getConfiguration();
-    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
-                           separator);
-    if (parser.getRowKeyColumnIndex() == -1) {
-      throw new RuntimeException("No row key column specified");
-    }
-    this.kvCreator = new CellCreator(conf);
-    tags = new ArrayList<>();
-  }
-
-  /**
-   * Handles common parameter initialization that a subclass might want to leverage.
-   * @param context
-   */
-  protected void doSetup(Context context) {
-    Configuration conf = context.getConfiguration();
-
-    // If a custom separator has been used,
-    // decode it back from Base64 encoding.
-    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
-    if (separator == null) {
-      separator = ImportTsv.DEFAULT_SEPARATOR;
-    } else {
-      separator = new String(Base64.decode(separator));
-    }
-    // Should never get 0 as we are setting this to a valid value in job
-    // configuration.
-    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
-
-    skipEmptyColumns = context.getConfiguration().getBoolean(
-        ImportTsv.SKIP_EMPTY_COLUMNS, false);
-    skipBadLines = context.getConfiguration().getBoolean(
-        ImportTsv.SKIP_LINES_CONF_KEY, true);
-    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
-    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
-    hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
-  }
-
-  /**
-   * Convert a line of TSV text into an HBase table row.
-   */
-  @Override
-  public void map(LongWritable offset, Text value,
-    Context context)
-  throws IOException {
-    byte[] lineBytes = value.getBytes();
-
-    try {
-      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
-          lineBytes, value.getLength());
-      ImmutableBytesWritable rowKey =
-        new ImmutableBytesWritable(lineBytes,
-            parsed.getRowKeyOffset(),
-            parsed.getRowKeyLength());
-      // Retrieve timestamp if exists
-      ts = parsed.getTimestamp(ts);
-      cellVisibilityExpr = parsed.getCellVisibility();
-      ttl = parsed.getCellTTL();
-
-      // create tags for the parsed line
-      if (hfileOutPath != null) {
-        tags.clear();
-        if (cellVisibilityExpr != null) {
-          tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
-            cellVisibilityExpr));
-        }
-        // Add TTL directly to the KV so we can vary them when packing more than one KV
-        // into puts
-        if (ttl > 0) {
-          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
-        }
-      }
-      Put put = new Put(rowKey.copyBytes());
-      for (int i = 0; i < parsed.getColumnCount(); i++) {
-        if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
-            || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns 
-            && parsed.getColumnLength(i) == 0)) {
-          continue;
-        }
-        populatePut(lineBytes, parsed, put, i);
-      }
-      context.write(rowKey, put);
-    } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
-        | InvalidLabelException badLine) {
-      if (logBadLines) {
-        System.err.println(value);
-      }
-      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
-      if (skipBadLines) {
-        incrementBadLineCount(1);
-        return;
-      }
-      throw new IOException(badLine);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
-      int i) throws BadTsvLineException, IOException {
-    Cell cell = null;
-    if (hfileOutPath == null) {
-      cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
-          parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
-          parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
-          parsed.getColumnOffset(i), parsed.getColumnLength(i));
-      if (cellVisibilityExpr != null) {
-        // We won't be validating the expression here. The Visibility CP will do
-        // the validation
-        put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
-      }
-      if (ttl > 0) {
-        put.setTTL(ttl);
-      }
-    } else {
-      // Creating the KV which needs to be directly written to HFiles. Using the Facade
-      // KVCreator for creation of kvs.
-      cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
-          parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
-          parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
-          parsed.getColumnLength(i), tags);
-    }
-    put.add(cell);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
deleted file mode 100644
index 581f0d0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-
-/**
- * Write table content out to map output files.
- */
-@InterfaceAudience.Public
-public class TsvImporterTextMapper
-extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
-{
-
-  /** Column seperator */
-  private String separator;
-
-  /** Should skip bad lines */
-  private boolean skipBadLines;
-  private Counter badLineCount;
-  private boolean logBadLines;
-
-  private ImportTsv.TsvParser parser;
-
-  public boolean getSkipBadLines() {
-    return skipBadLines;
-  }
-
-  public Counter getBadLineCount() {
-    return badLineCount;
-  }
-
-  public void incrementBadLineCount(int count) {
-    this.badLineCount.increment(count);
-  }
-
-  /**
-   * Handles initializing this class with objects specific to it (i.e., the parser).
-   * Common initialization that might be leveraged by a subsclass is done in
-   * <code>doSetup</code>. Hence a subclass may choose to override this method
-   * and call <code>doSetup</code> as well before handling it's own custom params.
-   *
-   * @param context
-   */
-  @Override
-  protected void setup(Context context) {
-    doSetup(context);
-
-    Configuration conf = context.getConfiguration();
-
-    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
-    if (parser.getRowKeyColumnIndex() == -1) {
-      throw new RuntimeException("No row key column specified");
-    }
-  }
-
-  /**
-   * Handles common parameter initialization that a subclass might want to leverage.
-   * @param context
-   */
-  protected void doSetup(Context context) {
-    Configuration conf = context.getConfiguration();
-
-    // If a custom separator has been used,
-    // decode it back from Base64 encoding.
-    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
-    if (separator == null) {
-      separator = ImportTsv.DEFAULT_SEPARATOR;
-    } else {
-      separator = new String(Base64.decode(separator));
-    }
-
-    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
-    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
-    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
-  }
-
-  /**
-   * Convert a line of TSV text into an HBase table row.
-   */
-  @Override
-  public void map(LongWritable offset, Text value, Context context) throws IOException {
-    try {
-      Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
-      ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
-          value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
-      context.write(rowKey, value);
-    } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
-      if (logBadLines) {
-        System.err.println(value);
-      }
-      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
-      if (skipBadLines) {
-        incrementBadLineCount(1);
-        return;
-      } 
-      throw new IOException(badLine);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      Thread.currentThread().interrupt();
-    } 
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
deleted file mode 100644
index a83a88f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.Tag;
-
-/**
- * Interface to convert visibility expressions into Tags for storing along with Cells in HFiles.
- */
-@InterfaceAudience.Public
-public interface VisibilityExpressionResolver extends Configurable {
-
-  /**
-   * Giving a chance for the initialization.
-   */
-  void init();
-
-  /**
-   * Convert visibility expression into tags to be serialized.
-   * @param visExpression the label expression
-   * @return The list of tags corresponds to the visibility expression. These tags will be stored
-   *         along with the Cells.
-   */
-  List<Tag> createVisibilityExpTags(String visExpression) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
deleted file mode 100644
index 8b4e967..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
- */
-@InterfaceAudience.Public
-public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
-  private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
-
-  public static final String START_TIME_KEY = "wal.start.time";
-  public static final String END_TIME_KEY = "wal.end.time";
-
-  /**
-   * {@link InputSplit} for {@link WAL} files. Each split represent
-   * exactly one log file.
-   */
-  static class WALSplit extends InputSplit implements Writable {
-    private String logFileName;
-    private long fileSize;
-    private long startTime;
-    private long endTime;
-
-    /** for serialization */
-    public WALSplit() {}
-
-    /**
-     * Represent an WALSplit, i.e. a single WAL file.
-     * Start- and EndTime are managed by the split, so that WAL files can be
-     * filtered before WALEdits are passed to the mapper(s).
-     * @param logFileName
-     * @param fileSize
-     * @param startTime
-     * @param endTime
-     */
-    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
-      this.logFileName = logFileName;
-      this.fileSize = fileSize;
-      this.startTime = startTime;
-      this.endTime = endTime;
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-      return fileSize;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      // TODO: Find the data node with the most blocks for this WAL?
-      return new String[] {};
-    }
-
-    public String getLogFileName() {
-      return logFileName;
-    }
-
-    public long getStartTime() {
-      return startTime;
-    }
-
-    public long getEndTime() {
-      return endTime;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      logFileName = in.readUTF();
-      fileSize = in.readLong();
-      startTime = in.readLong();
-      endTime = in.readLong();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeUTF(logFileName);
-      out.writeLong(fileSize);
-      out.writeLong(startTime);
-      out.writeLong(endTime);
-    }
-
-    @Override
-    public String toString() {
-      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
-    }
-  }
-
-  /**
-   * {@link RecordReader} for an {@link WAL} file.
-   * Implementation shared with deprecated HLogInputFormat.
-   */
-  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
-    private Reader reader = null;
-    // visible until we can remove the deprecated HLogInputFormat
-    Entry currentEntry = new Entry();
-    private long startTime;
-    private long endTime;
-    private Configuration conf;
-    private Path logFile;
-    private long currentPos;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      WALSplit hsplit = (WALSplit)split;
-      logFile = new Path(hsplit.getLogFileName());
-      conf = context.getConfiguration();
-      LOG.info("Opening reader for "+split);
-      openReader(logFile);
-      this.startTime = hsplit.getStartTime();
-      this.endTime = hsplit.getEndTime();
-    }
-
-    private void openReader(Path path) throws IOException
-    {
-      closeReader();
-      reader = AbstractFSWALProvider.openReader(path, conf);
-      seek();
-      setCurrentPath(path);
-    }
-
-    private void setCurrentPath(Path path) {
-      this.logFile = path;
-    }
-
-    private void closeReader() throws IOException {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-
-    private void seek() throws IOException {
-      if (currentPos != 0) {
-        reader.seek(currentPos);
-      }
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (reader == null) return false;
-      this.currentPos = reader.getPosition();
-      Entry temp;
-      long i = -1;
-      try {
-        do {
-          // skip older entries
-          try {
-            temp = reader.next(currentEntry);
-            i++;
-          } catch (EOFException x) {
-            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
-                + " (This is normal when a RegionServer crashed.)");
-            return false;
-          }
-        } while (temp != null && temp.getKey().getWriteTime() < startTime);
-
-        if (temp == null) {
-          if (i > 0) LOG.info("Skipped " + i + " entries.");
-          LOG.info("Reached end of file.");
-          return false;
-        } else if (i > 0) {
-          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
-        }
-        boolean res = temp.getKey().getWriteTime() <= endTime;
-        if (!res) {
-          LOG.info("Reached ts: " + temp.getKey().getWriteTime()
-              + " ignoring the rest of the file.");
-        }
-        return res;
-      } catch (IOException e) {
-        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
-        if (logFile != archivedLog) {
-          openReader(archivedLog);
-          // Try call again in recursion
-          return nextKeyValue();
-        } else {
-          throw e;
-        }
-      }
-    }
-
-    @Override
-    public WALEdit getCurrentValue() throws IOException, InterruptedException {
-      return currentEntry.getEdit();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      // N/A depends on total number of entries, which is unknown
-      return 0;
-    }
-
-    @Override
-    public void close() throws IOException {
-      LOG.info("Closing reader");
-      if (reader != null) this.reader.close();
-    }
-  }
-
-  /**
-   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
-   * need to support HLogInputFormat.
-   */
-  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
-    @Override
-    public WALKey getCurrentKey() throws IOException, InterruptedException {
-      return currentEntry.getKey();
-    }
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException,
-      InterruptedException {
-    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
-  }
-
-  /**
-   * implementation shared with deprecated HLogInputFormat
-   */
-  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
-    Path[] inputPaths = getInputPaths(conf);
-    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
-    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
-
-    List<FileStatus> allFiles = new ArrayList<FileStatus>();
-    for(Path inputPath: inputPaths){
-      FileSystem fs = inputPath.getFileSystem(conf);
-      try {
-        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
-        allFiles.addAll(files);
-      } catch (FileNotFoundException e) {
-        if (ignoreMissing) {
-          LOG.warn("File "+ inputPath +" is missing. Skipping it.");
-          continue;
-        }
-        throw e;
-      }
-    }
-    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
-    for (FileStatus file : allFiles) {
-      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
-    }
-    return splits;
-  }
-
-  private Path[] getInputPaths(Configuration conf) {
-    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
-    return StringUtils.stringToPath(
-      inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
-  }
-
-  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
-      throws IOException {
-    List<FileStatus> result = new ArrayList<>();
-    LOG.debug("Scanning " + dir.toString() + " for WAL files");
-
-    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
-    if (!iter.hasNext()) return Collections.emptyList();
-    while (iter.hasNext()) {
-      LocatedFileStatus file = iter.next();
-      if (file.isDirectory()) {
-        // recurse into sub directories
-        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
-      } else {
-        String name = file.getPath().toString();
-        int idx = name.lastIndexOf('.');
-        if (idx > 0) {
-          try {
-            long fileStartTime = Long.parseLong(name.substring(idx+1));
-            if (fileStartTime <= endTime) {
-              LOG.info("Found: " + file);
-              result.add(file);
-            }
-          } catch (NumberFormatException x) {
-            idx = 0;
-          }
-        }
-        if (idx == 0) {
-          LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
-        }
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new WALKeyRecordReader();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
deleted file mode 100644
index b1e655c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to replay WAL files as a M/R job.
- * The WAL can be replayed for a set of tables or all tables,
- * and a time range can be provided (in milliseconds).
- * The WAL is filtered to the passed set of tables and  the output
- * can optionally be mapped to another set of tables.
- *
- * WAL replay can also generate HFiles for later bulk importing,
- * in that case the WAL is replayed for a single table only.
- */
-@InterfaceAudience.Public
-public class WALPlayer extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(WALPlayer.class);
-  final static String NAME = "WALPlayer";
-  public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
-  public final static String TABLES_KEY = "wal.input.tables";
-  public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
-  public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
-  public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
-
-
-  // This relies on Hadoop Configuration to handle warning about deprecated configs and
-  // to set the correct non-deprecated configs when an old one shows up.
-  static {
-    Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
-    Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
-    Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
-  }
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  public WALPlayer(){
-  }
-
-  protected WALPlayer(final Configuration c) {
-    super(c);
-  }
-
-  /**
-   * A mapper that just writes out KeyValues.
-   * This one can be used together with {@link KeyValueSortReducer}
-   */
-  static class WALKeyValueMapper
-    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
-    private byte[] table;
-
-    @Override
-    public void map(WALKey key, WALEdit value,
-      Context context)
-    throws IOException {
-      try {
-        // skip all other tables
-        if (Bytes.equals(table, key.getTablename().getName())) {
-          for (Cell cell : value.getCells()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            if (WALEdit.isMetaEditFamily(kv)) {
-              continue;
-            }
-            context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
-          }
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void setup(Context context) throws IOException {
-      // only a single table is supported when HFiles are generated with HFileOutputFormat
-      String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
-      if (tables == null || tables.length != 1) {
-        // this can only happen when WALMapper is used directly by a class other than WALPlayer
-        throw new IOException("Exactly one table must be specified for bulk HFile case.");
-      }
-      table = Bytes.toBytes(tables[0]);
-
-    }
-
-  }
-
-  /**
-   * A mapper that writes out {@link Mutation} to be directly applied to
-   * a running HBase instance.
-   */
-  protected static class WALMapper
-  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
-    private Map<TableName, TableName> tables = new TreeMap<>();
-
-    @Override
-    public void map(WALKey key, WALEdit value, Context context)
-    throws IOException {
-      try {
-        if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
-          TableName targetTable = tables.isEmpty() ?
-                key.getTablename() :
-                tables.get(key.getTablename());
-          ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
-          Put put = null;
-          Delete del = null;
-          Cell lastCell = null;
-          for (Cell cell : value.getCells()) {
-            // filtering WAL meta entries
-            if (WALEdit.isMetaEditFamily(cell)) {
-              continue;
-            }
-
-            // Allow a subclass filter out this cell.
-            if (filter(context, cell)) {
-              // A WALEdit may contain multiple operations (HBASE-3584) and/or
-              // multiple rows (HBASE-5229).
-              // Aggregate as much as possible into a single Put/Delete
-              // operation before writing to the context.
-              if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
-                  || !CellUtil.matchingRow(lastCell, cell)) {
-                // row or type changed, write out aggregate KVs.
-                if (put != null) {
-                  context.write(tableOut, put);
-                }
-                if (del != null) {
-                  context.write(tableOut, del);
-                }
-                if (CellUtil.isDelete(cell)) {
-                  del = new Delete(CellUtil.cloneRow(cell));
-                } else {
-                  put = new Put(CellUtil.cloneRow(cell));
-                }
-              }
-              if (CellUtil.isDelete(cell)) {
-                del.add(cell);
-              } else {
-                put.add(cell);
-              }
-            }
-            lastCell = cell;
-          }
-          // write residual KVs
-          if (put != null) {
-            context.write(tableOut, put);
-          }
-          if (del != null) {
-            context.write(tableOut, del);
-          }
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    protected boolean filter(Context context, final Cell cell) {
-      return true;
-    }
-
-    @Override
-    protected void
-        cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
-            throws IOException, InterruptedException {
-      super.cleanup(context);
-    }
-
-    @Override
-    public void setup(Context context) throws IOException {
-      String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
-      String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
-      if (tableMap == null) {
-        tableMap = tablesToUse;
-      }
-      if (tablesToUse == null) {
-        // Then user wants all tables.
-      } else if (tablesToUse.length != tableMap.length) {
-        // this can only happen when WALMapper is used directly by a class other than WALPlayer
-        throw new IOException("Incorrect table mapping specified .");
-      }
-      int i = 0;
-      if (tablesToUse != null) {
-        for (String table : tablesToUse) {
-          tables.put(TableName.valueOf(table),
-            TableName.valueOf(tableMap[i++]));
-        }
-      }
-    }
-  }
-
-  void setupTime(Configuration conf, String option) throws IOException {
-    String val = conf.get(option);
-    if (null == val) {
-      return;
-    }
-    long ms;
-    try {
-      // first try to parse in user friendly form
-      ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
-    } catch (ParseException pe) {
-      try {
-        // then see if just a number of ms's was specified
-        ms = Long.parseLong(val);
-      } catch (NumberFormatException nfe) {
-        throw new IOException(option
-            + " must be specified either in the form 2001-02-20T16:35:06.99 "
-            + "or as number of milliseconds");
-      }
-    }
-    conf.setLong(option, ms);
-  }
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public Job createSubmittableJob(String[] args) throws IOException {
-    Configuration conf = getConf();
-    setupTime(conf, WALInputFormat.START_TIME_KEY);
-    setupTime(conf, WALInputFormat.END_TIME_KEY);
-    String inputDirs = args[0];
-    String[] tables = args[1].split(",");
-    String[] tableMap;
-    if (args.length > 2) {
-      tableMap = args[2].split(",");
-      if (tableMap.length != tables.length) {
-        throw new IOException("The same number of tables and mapping must be provided.");
-      }
-    } else {
-      // if not mapping is specified map each table to itself
-      tableMap = tables;
-    }
-    conf.setStrings(TABLES_KEY, tables);
-    conf.setStrings(TABLE_MAP_KEY, tableMap);
-    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
-    job.setJarByClass(WALPlayer.class);
-
-    job.setInputFormatClass(WALInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-
-    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-    if (hfileOutPath != null) {
-      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
-
-      // the bulk HFile case
-      if (tables.length != 1) {
-        throw new IOException("Exactly one table must be specified for the bulk export option");
-      }
-      TableName tableName = TableName.valueOf(tables[0]);
-      job.setMapperClass(WALKeyValueMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
-      Path outputDir = new Path(hfileOutPath);
-      FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-      }
-      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
-          org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-    } else {
-      // output to live cluster
-      job.setMapperClass(WALMapper.class);
-      job.setOutputFormatClass(MultiTableOutputFormat.class);
-      TableMapReduceUtil.addDependencyJars(job);
-      TableMapReduceUtil.initCredentials(job);
-      // No reducers.
-      job.setNumReduceTasks(0);
-    }
-    String codecCls = WALCellCodec.getWALCellCodecClass(conf);
-    try {
-      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
-    } catch (Exception e) {
-      throw new IOException("Cannot determine wal codec class " + codecCls, e);
-    }
-    return job;
-  }
-
-
-  /**
-   * Print usage
-   * @param errorMsg Error message.  Can be null.
-   */
-  private void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
-    System.err.println("Read all WAL entries for <tables>.");
-    System.err.println("If no tables (\"\") are specific, all tables are imported.");
-    System.err.println("(Careful, even hbase:meta entries will be imported"+
-      " in that case.)");
-    System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
-    System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
-    System.err.println("<tableMapping> is a command separated list of targettables.");
-    System.err.println("If specified, each table in <tables> must have a mapping.\n");
-    System.err.println("By default " + NAME + " will load data directly into HBase.");
-    System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
-    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
-    System.err.println("  (Only one table can be specified, and no mapping is allowed!)");
-    System.err.println("Other options: (specify time range to WAL edit to consider)");
-    System.err.println("  -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
-    System.err.println("  -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-      + "=jobName - use the specified mapreduce job name for the wal player");
-    System.err.println("For performance also consider the following options:\n"
-      + "  -Dmapreduce.map.speculative=false\n"
-      + "  -Dmapreduce.reduce.speculative=false");
-  }
-
-  /**
-   * Main entry point.
-   *
-   * @param args  The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      System.exit(-1);
-    }
-    Job job = createSubmittableJob(args);
-    return job.waitForCompletion(true) ? 0 : 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
deleted file mode 100644
index 199e168..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
-Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
-Input/OutputFormats, a table indexing MapReduce job, and utility methods.
-
-<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
-in the HBase Reference Guide for mapreduce over hbase documentation. 
-*/
-package org.apache.hadoop.hbase.mapreduce;