You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:23 UTC

[23/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
deleted file mode 100644
index b5bb2ec..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.zookeeper.KeeperException;
-
-
-/**
- * Import data written by {@link Export}.
- */
-@InterfaceAudience.Public
-public class Import extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(Import.class);
-  final static String NAME = "import";
-  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
-  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
-  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
-  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
-  public final static String TABLE_NAME = "import.table.name";
-  public final static String WAL_DURABILITY = "import.wal.durability";
-  public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  public static class KeyValueWritableComparablePartitioner 
-      extends Partitioner<KeyValueWritableComparable, KeyValue> {
-    private static KeyValueWritableComparable[] START_KEYS = null;
-    @Override
-    public int getPartition(KeyValueWritableComparable key, KeyValue value,
-        int numPartitions) {
-      for (int i = 0; i < START_KEYS.length; ++i) {
-        if (key.compareTo(START_KEYS[i]) <= 0) {
-          return i;
-        }
-      }
-      return START_KEYS.length;
-    }
-    
-  }
-  
-  public static class KeyValueWritableComparable 
-      implements WritableComparable<KeyValueWritableComparable> {
-
-    private KeyValue kv = null;
-    
-    static {                                       
-      // register this comparator
-      WritableComparator.define(KeyValueWritableComparable.class, 
-          new KeyValueWritableComparator());
-    }
-    
-    public KeyValueWritableComparable() {
-    }
-    
-    public KeyValueWritableComparable(KeyValue kv) {
-      this.kv = kv;
-    }
-    
-    @Override
-    public void write(DataOutput out) throws IOException {
-      KeyValue.write(kv, out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      kv = KeyValue.create(in);
-    }
-
-    @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="This is wrong, yes, but we should be purging Writables, not fixing them")
-    public int compareTo(KeyValueWritableComparable o) {
-      return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
-    }
-    
-    public static class KeyValueWritableComparator extends WritableComparator {
-
-      @Override
-      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-        try {
-          KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
-          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
-          KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
-          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
-          return compare(kv1, kv2);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        } 
-      }
-      
-    }
-    
-  }
-  
-  public static class KeyValueReducer
-      extends
-      Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
-    protected void reduce(
-        KeyValueWritableComparable row,
-        Iterable<KeyValue> kvs,
-        Reducer<KeyValueWritableComparable,
-          KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
-        throws java.io.IOException, InterruptedException {
-      int index = 0;
-      for (KeyValue kv : kvs) {
-        context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
-        if (++index % 100 == 0)
-          context.setStatus("Wrote " + index + " KeyValues, "
-              + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 
-      }
-    }
-  }
-  
-  public static class KeyValueSortImporter 
-      extends TableMapper<KeyValueWritableComparable, KeyValue> {
-    private Map<byte[], byte[]> cfRenameMap;
-    private Filter filter;
-    private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
-
-    /**
-     * @param row  The current table row key.
-     * @param value  The columns.
-     * @param context  The current context.
-     * @throws IOException When something is broken with the data.
-     */
-    @Override
-    public void map(ImmutableBytesWritable row, Result value,
-      Context context)
-    throws IOException {
-      try {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Considering the row."
-              + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
-        }
-        if (filter == null
-            || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
-                (short) row.getLength()))) {
-          for (Cell kv : value.rawCells()) {
-            kv = filterKv(filter, kv);
-            // skip if we filtered it out
-            if (kv == null) continue;
-            // TODO get rid of ensureKeyValue
-            KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
-            context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); 
-          }
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-    
-    @Override
-    public void setup(Context context) throws IOException { 
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
-      filter = instantiateFilter(context.getConfiguration());
-      int reduceNum = context.getNumReduceTasks();
-      Configuration conf = context.getConfiguration();
-      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        byte[][] startKeys = regionLocator.getStartKeys();
-        if (startKeys.length != reduceNum) {
-          throw new IOException("Region split after job initialization");
-        }
-        KeyValueWritableComparable[] startKeyWraps = 
-            new KeyValueWritableComparable[startKeys.length - 1];
-        for (int i = 1; i < startKeys.length; ++i) {
-          startKeyWraps[i - 1] = 
-              new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
-        }
-        KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
-      }
-    }
-  }
-  
-  /**
-   * A mapper that just writes out KeyValues.
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="Writables are going away and this has been this way forever")
-  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
-    private Map<byte[], byte[]> cfRenameMap;
-    private Filter filter;
-    private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
-
-    /**
-     * @param row  The current table row key.
-     * @param value  The columns.
-     * @param context  The current context.
-     * @throws IOException When something is broken with the data.
-     */
-    @Override
-    public void map(ImmutableBytesWritable row, Result value,
-      Context context)
-    throws IOException {
-      try {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Considering the row."
-              + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
-        }
-        if (filter == null
-            || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
-                (short) row.getLength()))) {
-          for (Cell kv : value.rawCells()) {
-            kv = filterKv(filter, kv);
-            // skip if we filtered it out
-            if (kv == null) continue;
-            // TODO get rid of ensureKeyValue
-            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
-          }
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void setup(Context context) {
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
-      filter = instantiateFilter(context.getConfiguration());
-    }
-  }
-
-  /**
-   * Write table content out to files in hdfs.
-   */
-  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
-    private Map<byte[], byte[]> cfRenameMap;
-    private List<UUID> clusterIds;
-    private Filter filter;
-    private Durability durability;
-
-    /**
-     * @param row  The current table row key.
-     * @param value  The columns.
-     * @param context  The current context.
-     * @throws IOException When something is broken with the data.
-     */
-    @Override
-    public void map(ImmutableBytesWritable row, Result value,
-      Context context)
-    throws IOException {
-      try {
-        writeResult(row, value, context);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
-    throws IOException, InterruptedException {
-      Put put = null;
-      Delete delete = null;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Considering the row."
-            + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
-      }
-      if (filter == null
-          || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(),
-              (short) key.getLength()))) {
-        processKV(key, result, context, put, delete);
-      }
-    }
-
-    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
-        Delete delete) throws IOException, InterruptedException {
-      for (Cell kv : result.rawCells()) {
-        kv = filterKv(filter, kv);
-        // skip if we filter it out
-        if (kv == null) continue;
-
-        kv = convertKv(kv, cfRenameMap);
-        // Deletes and Puts are gathered and written when finished
-        /*
-         * If there are sequence of mutations and tombstones in an Export, and after Import the same
-         * sequence should be restored as it is. If we combine all Delete tombstones into single
-         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
-         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
-         * only newest in hbase table and ignoring other. Check - HBASE-12065
-         */
-        if (CellUtil.isDeleteFamily(kv)) {
-          Delete deleteFamily = new Delete(key.get());
-          deleteFamily.add(kv);
-          if (durability != null) {
-            deleteFamily.setDurability(durability);
-          }
-          deleteFamily.setClusterIds(clusterIds);
-          context.write(key, deleteFamily);
-        } else if (CellUtil.isDelete(kv)) {
-          if (delete == null) {
-            delete = new Delete(key.get());
-          }
-          delete.add(kv);
-        } else {
-          if (put == null) {
-            put = new Put(key.get());
-          }
-          addPutToKv(put, kv);
-        }
-      }
-      if (put != null) {
-        if (durability != null) {
-          put.setDurability(durability);
-        }
-        put.setClusterIds(clusterIds);
-        context.write(key, put);
-      }
-      if (delete != null) {
-        if (durability != null) {
-          delete.setDurability(durability);
-        }
-        delete.setClusterIds(clusterIds);
-        context.write(key, delete);
-      }
-    }
-
-    protected void addPutToKv(Put put, Cell kv) throws IOException {
-      put.add(kv);
-    }
-
-    @Override
-    public void setup(Context context) {
-      LOG.info("Setting up " + getClass() + " mapper.");
-      Configuration conf = context.getConfiguration();
-      cfRenameMap = createCfRenameMap(conf);
-      filter = instantiateFilter(conf);
-      String durabilityStr = conf.get(WAL_DURABILITY);
-      if(durabilityStr != null){
-        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
-        LOG.info("setting WAL durability to " + durability);
-      } else {
-        LOG.info("setting WAL durability to default.");
-      }
-      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
-      ZooKeeperWatcher zkw = null;
-      Exception ex = null;
-      try {
-        zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
-        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
-      } catch (ZooKeeperConnectionException e) {
-        ex = e;
-        LOG.error("Problem connecting to ZooKeper during task setup", e);
-      } catch (KeeperException e) {
-        ex = e;
-        LOG.error("Problem reading ZooKeeper data during task setup", e);
-      } catch (IOException e) {
-        ex = e;
-        LOG.error("Problem setting up task", e);
-      } finally {
-        if (zkw != null) zkw.close();
-      }
-      if (clusterIds == null) {
-        // exit early if setup fails
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  /**
-   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
-   * optionally not include in the job output
-   * @param conf {@link Configuration} from which to load the filter
-   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
-   * @throws IllegalArgumentException if the filter is misconfigured
-   */
-  public static Filter instantiateFilter(Configuration conf) {
-    // get the filter, if it was configured    
-    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
-    if (filterClass == null) {
-      LOG.debug("No configured filter class, accepting all keyvalues.");
-      return null;
-    }
-    LOG.debug("Attempting to create filter:" + filterClass);
-    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
-    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
-    try {
-      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
-      return (Filter) m.invoke(null, quotedArgs);
-    } catch (IllegalAccessException e) {
-      LOG.error("Couldn't instantiate filter!", e);
-      throw new RuntimeException(e);
-    } catch (SecurityException e) {
-      LOG.error("Couldn't instantiate filter!", e);
-      throw new RuntimeException(e);
-    } catch (NoSuchMethodException e) {
-      LOG.error("Couldn't instantiate filter!", e);
-      throw new RuntimeException(e);
-    } catch (IllegalArgumentException e) {
-      LOG.error("Couldn't instantiate filter!", e);
-      throw new RuntimeException(e);
-    } catch (InvocationTargetException e) {
-      LOG.error("Couldn't instantiate filter!", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
-    ArrayList<byte[]> quotedArgs = new ArrayList<>();
-    for (String stringArg : stringArgs) {
-      // all the filters' instantiation methods expected quoted args since they are coming from
-      // the shell, so add them here, though it shouldn't really be needed :-/
-      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
-    }
-    return quotedArgs;
-  }
-
-  /**
-   * Attempt to filter out the keyvalue
-   * @param kv {@link KeyValue} on which to apply the filter
-   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
-   *         {@link KeyValue}
-   */
-  public static Cell filterKv(Filter filter, Cell kv) throws IOException {
-    // apply the filter and skip this kv if the filter doesn't apply
-    if (filter != null) {
-      Filter.ReturnCode code = filter.filterKeyValue(kv);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Filter returned:" + code + " for the key value:" + kv);
-      }
-      // if its not an accept type, then skip this kv
-      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
-          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
-        return null;
-      }
-    }
-    return kv;
-  }
-
-  // helper: create a new KeyValue based on CF rename map
-  private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
-    if(cfRenameMap != null) {
-      // If there's a rename mapping for this CF, create a new KeyValue
-      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
-      if(newCfName != null) {
-          kv = new KeyValue(kv.getRowArray(), // row buffer 
-                  kv.getRowOffset(),        // row offset
-                  kv.getRowLength(),        // row length
-                  newCfName,                // CF buffer
-                  0,                        // CF offset 
-                  newCfName.length,         // CF length 
-                  kv.getQualifierArray(),   // qualifier buffer
-                  kv.getQualifierOffset(),  // qualifier offset
-                  kv.getQualifierLength(),  // qualifier length
-                  kv.getTimestamp(),        // timestamp
-                  KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
-                  kv.getValueArray(),       // value buffer 
-                  kv.getValueOffset(),      // value offset
-                  kv.getValueLength());     // value length
-      }
-    }
-    return kv;
-  }
-
-  // helper: make a map from sourceCfName to destCfName by parsing a config key
-  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
-    Map<byte[], byte[]> cfRenameMap = null;
-    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
-    if(allMappingsPropVal != null) {
-      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
-      String[] allMappings = allMappingsPropVal.split(",");
-      for (String mapping: allMappings) {
-        if(cfRenameMap == null) {
-            cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-        }
-        String [] srcAndDest = mapping.split(":");
-        if(srcAndDest.length != 2) {
-            continue;
-        }
-        cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
-      }
-    }
-    return cfRenameMap;
-  }
-
-  /**
-   * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
-   * the mapper how to rename column families.
-   * 
-   * <p>Alternately, instead of calling this function, you could set the configuration key 
-   * {@link #CF_RENAME_PROP} yourself. The value should look like 
-   * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
-   * the mapper behavior.
-   * 
-   * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
-   *  set
-   * @param renameMap a mapping from source CF names to destination CF names
-   */
-  static public void configureCfRenaming(Configuration conf, 
-          Map<String, String> renameMap) {
-    StringBuilder sb = new StringBuilder();
-    for(Map.Entry<String,String> entry: renameMap.entrySet()) {
-      String sourceCf = entry.getKey();
-      String destCf = entry.getValue();
-
-      if(sourceCf.contains(":") || sourceCf.contains(",") || 
-              destCf.contains(":") || destCf.contains(",")) {
-        throw new IllegalArgumentException("Illegal character in CF names: " 
-              + sourceCf + ", " + destCf);
-      }
-
-      if(sb.length() != 0) {
-        sb.append(",");
-      }
-      sb.append(sourceCf + ":" + destCf);
-    }
-    conf.set(CF_RENAME_PROP, sb.toString());
-  }
-
-  /**
-   * Add a Filter to be instantiated on import
-   * @param conf Configuration to update (will be passed to the job)
-   * @param clazz {@link Filter} subclass to instantiate on the server.
-   * @param filterArgs List of arguments to pass to the filter on instantiation
-   */
-  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
-      List<String> filterArgs) throws IOException {
-    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
-    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
-  }
-
-  /**
-   * Sets up the actual job.
-   * @param conf The current configuration.
-   * @param args The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException {
-    TableName tableName = TableName.valueOf(args[0]);
-    conf.set(TABLE_NAME, tableName.getNameAsString());
-    Path inputDir = new Path(args[1]);
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJarByClass(Importer.class);
-    FileInputFormat.setInputPaths(job, inputDir);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-
-    // make sure we get the filter in the jars
-    try {
-      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
-      if (filter != null) {
-        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
-      LOG.info("Use Large Result!!");
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-        job.setMapperClass(KeyValueSortImporter.class);
-        job.setReducerClass(KeyValueReducer.class);
-        Path outputDir = new Path(hfileOutPath);
-        FileOutputFormat.setOutputPath(job, outputDir);
-        job.setMapOutputKeyClass(KeyValueWritableComparable.class);
-        job.setMapOutputValueClass(KeyValue.class);
-        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", 
-            KeyValueWritableComparable.KeyValueWritableComparator.class,
-            RawComparator.class);
-        Path partitionsPath = 
-            new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
-        FileSystem fs = FileSystem.get(job.getConfiguration());
-        fs.deleteOnExit(partitionsPath);
-        job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
-        job.setNumReduceTasks(regionLocator.getStartKeys().length);
-        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
-            org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-      }
-    } else if (hfileOutPath != null) {
-      LOG.info("writing to hfiles for bulk load.");
-      job.setMapperClass(KeyValueImporter.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf); 
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)){
-        job.setReducerClass(KeyValueSortReducer.class);
-        Path outputDir = new Path(hfileOutPath);
-        FileOutputFormat.setOutputPath(job, outputDir);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(KeyValue.class);
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
-            org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-      }
-    } else {
-      LOG.info("writing directly to table from Mapper.");
-      // No reducers.  Just write straight to table.  Call initTableReducerJob
-      // because it sets up the TableOutputFormat.
-      job.setMapperClass(Importer.class);
-      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
-      job.setNumReduceTasks(0);
-    }
-    return job;
-  }
-
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: Import [options] <tablename> <inputdir>");
-    System.err.println("By default Import will load data directly into HBase. To instead generate");
-    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
-    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
-    System.err.println("If there is a large result that includes too much KeyValue "
-        + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
-    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
-    System.err
-        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
-    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
-    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
-    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
-        + CF_RENAME_PROP + " property. Futher, filters will only use the"
-        + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
-        + " whether the current row needs to be ignored completely for processing and "
-        + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
-        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
-        + " the KeyValue.");
-    System.err.println("To import data exported from HBase 0.94, use");
-    System.err.println("  -Dhbase.import.version=0.94");
-    System.err.println("  -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the import");
-    System.err.println("For performance consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n"
-        + "  -Dmapreduce.reduce.speculative=false\n"
-        + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
-            +" Allowed values are the supported durability values"
-            +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
-  }
-
-  /**
-   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
-   * need to flush all the regions of the table as the data is held in memory and is also not
-   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
-   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
-   */
-  public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
-      InterruptedException {
-    String tableName = conf.get(TABLE_NAME);
-    Admin hAdmin = null;
-    Connection connection = null;
-    String durability = conf.get(WAL_DURABILITY);
-    // Need to flush if the data is written to hbase and skip wal is enabled.
-    if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
-        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
-      LOG.info("Flushing all data that skipped the WAL.");
-      try {
-        connection = ConnectionFactory.createConnection(conf);
-        hAdmin = connection.getAdmin();
-        hAdmin.flush(TableName.valueOf(tableName));
-      } finally {
-        if (hAdmin != null) {
-          hAdmin.close();
-        }
-        if (connection != null) {
-          connection.close();
-        }
-      }
-    }
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      return -1;
-    }
-    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
-    if (inputVersionString != null) {
-      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
-    }
-    Job job = createSubmittableJob(getConf(), args);
-    boolean isJobSuccessful = job.waitForCompletion(true);
-    if(isJobSuccessful){
-      // Flush all the regions of the table
-      flushRegionsIfNecessary(getConf());
-    }
-    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
-    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
-    if (outputRecords < inputRecords) {
-      System.err.println("Warning, not all records were imported (maybe filtered out).");
-      if (outputRecords == 0) {
-        System.err.println("If the data was exported from HBase 0.94 "+
-            "consider using -Dhbase.import.version=0.94.");
-      }
-    }
-
-    return (isJobSuccessful ? 0 : 1);
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
-    System.exit(errCode);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
deleted file mode 100644
index b64271e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ /dev/null
@@ -1,793 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static java.lang.String.format;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
-/**
- * Tool to import data from a TSV file.
- *
- * This tool is rather simplistic - it doesn't do any quoting or
- * escaping, but is useful for many data loads.
- *
- * @see ImportTsv#usage(String)
- */
-@InterfaceAudience.Public
-public class ImportTsv extends Configured implements Tool {
-
-  protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
-
-  final static String NAME = "importtsv";
-
-  public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
-  public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
-  public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
-  public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-  // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
-  // Move them out of the tool and let the mapper handle its own validation.
-  public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
-  // If true, bad lines are logged to stderr. Default: false.
-  public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
-  public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
-  public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
-  public final static String COLUMNS_CONF_KEY = "importtsv.columns";
-  public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
-  public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
-  //This config is used to propagate credentials from parent MR jobs which launch
-  //ImportTSV jobs. SEE IntegrationTestImportTsv.
-  public final static String CREDENTIALS_LOCATION = "credentials_location";
-  final static String DEFAULT_SEPARATOR = "\t";
-  final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
-  final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
-  final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
-  public final static String CREATE_TABLE_CONF_KEY = "create.table";
-  public final static String NO_STRICT_COL_FAMILY = "no.strict";
-  /**
-   * If table didn't exist and was created in dry-run mode, this flag is
-   * flipped to delete it when MR ends.
-   */
-  private static boolean DRY_RUN_TABLE_CREATED;
-
-  public static class TsvParser {
-    /**
-     * Column families and qualifiers mapped to the TSV columns
-     */
-    private final byte[][] families;
-    private final byte[][] qualifiers;
-
-    private final byte separatorByte;
-
-    private int rowKeyColumnIndex;
-
-    private int maxColumnCount;
-
-    // Default value must be negative
-    public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
-
-    private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
-
-    public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
-
-    public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
-
-    public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
-
-    public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
-
-    public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
-
-    private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
-
-    public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
-
-    public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
-
-    public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
-
-    private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-
-    private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
-
-    /**
-     * @param columnsSpecification the list of columns to parser out, comma separated.
-     * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
-     * @param separatorStr
-     */
-    public TsvParser(String columnsSpecification, String separatorStr) {
-      // Configure separator
-      byte[] separator = Bytes.toBytes(separatorStr);
-      Preconditions.checkArgument(separator.length == 1,
-        "TsvParser only supports single-byte separators");
-      separatorByte = separator[0];
-
-      // Configure columns
-      ArrayList<String> columnStrings = Lists.newArrayList(
-        Splitter.on(',').trimResults().split(columnsSpecification));
-
-      maxColumnCount = columnStrings.size();
-      families = new byte[maxColumnCount][];
-      qualifiers = new byte[maxColumnCount][];
-
-      for (int i = 0; i < columnStrings.size(); i++) {
-        String str = columnStrings.get(i);
-        if (ROWKEY_COLUMN_SPEC.equals(str)) {
-          rowKeyColumnIndex = i;
-          continue;
-        }
-        if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
-          timestampKeyColumnIndex = i;
-          continue;
-        }
-        if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
-          attrKeyColumnIndex = i;
-          continue;
-        }
-        if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
-          cellVisibilityColumnIndex = i;
-          continue;
-        }
-        if (CELL_TTL_COLUMN_SPEC.equals(str)) {
-          cellTTLColumnIndex = i;
-          continue;
-        }
-        String[] parts = str.split(":", 2);
-        if (parts.length == 1) {
-          families[i] = str.getBytes();
-          qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
-        } else {
-          families[i] = parts[0].getBytes();
-          qualifiers[i] = parts[1].getBytes();
-        }
-      }
-    }
-
-    public boolean hasTimestamp() {
-      return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
-    }
-
-    public int getTimestampKeyColumnIndex() {
-      return timestampKeyColumnIndex;
-    }
-
-    public boolean hasAttributes() {
-      return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
-    }
-
-    public boolean hasCellVisibility() {
-      return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-    }
-
-    public boolean hasCellTTL() {
-      return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-    }
-
-    public int getAttributesKeyColumnIndex() {
-      return attrKeyColumnIndex;
-    }
-
-    public int getCellVisibilityColumnIndex() {
-      return cellVisibilityColumnIndex;
-    }
-
-    public int getCellTTLColumnIndex() {
-      return cellTTLColumnIndex;
-    }
-
-    public int getRowKeyColumnIndex() {
-      return rowKeyColumnIndex;
-    }
-
-    public byte[] getFamily(int idx) {
-      return families[idx];
-    }
-    public byte[] getQualifier(int idx) {
-      return qualifiers[idx];
-    }
-
-    public ParsedLine parse(byte[] lineBytes, int length)
-    throws BadTsvLineException {
-      // Enumerate separator offsets
-      ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount);
-      for (int i = 0; i < length; i++) {
-        if (lineBytes[i] == separatorByte) {
-          tabOffsets.add(i);
-        }
-      }
-      if (tabOffsets.isEmpty()) {
-        throw new BadTsvLineException("No delimiter");
-      }
-
-      tabOffsets.add(length);
-
-      if (tabOffsets.size() > maxColumnCount) {
-        throw new BadTsvLineException("Excessive columns");
-      } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
-        throw new BadTsvLineException("No row key");
-      } else if (hasTimestamp()
-          && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
-        throw new BadTsvLineException("No timestamp");
-      } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
-        throw new BadTsvLineException("No attributes specified");
-      } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
-        throw new BadTsvLineException("No cell visibility specified");
-      } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
-        throw new BadTsvLineException("No cell TTL specified");
-      }
-      return new ParsedLine(tabOffsets, lineBytes);
-    }
-
-    class ParsedLine {
-      private final ArrayList<Integer> tabOffsets;
-      private byte[] lineBytes;
-
-      ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
-        this.tabOffsets = tabOffsets;
-        this.lineBytes = lineBytes;
-      }
-
-      public int getRowKeyOffset() {
-        return getColumnOffset(rowKeyColumnIndex);
-      }
-      public int getRowKeyLength() {
-        return getColumnLength(rowKeyColumnIndex);
-      }
-
-      public long getTimestamp(long ts) throws BadTsvLineException {
-        // Return ts if HBASE_TS_KEY is not configured in column spec
-        if (!hasTimestamp()) {
-          return ts;
-        }
-
-        String timeStampStr = Bytes.toString(lineBytes,
-            getColumnOffset(timestampKeyColumnIndex),
-            getColumnLength(timestampKeyColumnIndex));
-        try {
-          return Long.parseLong(timeStampStr);
-        } catch (NumberFormatException nfe) {
-          // treat this record as bad record
-          throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
-        }
-      }
-
-      private String getAttributes() {
-        if (!hasAttributes()) {
-          return null;
-        } else {
-          return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
-              getColumnLength(attrKeyColumnIndex));
-        }
-      }
-
-      public String[] getIndividualAttributes() {
-        String attributes = getAttributes();
-        if (attributes != null) {
-          return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
-        } else {
-          return null;
-        }
-      }
-
-      public int getAttributeKeyOffset() {
-        if (hasAttributes()) {
-          return getColumnOffset(attrKeyColumnIndex);
-        } else {
-          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
-        }
-      }
-
-      public int getAttributeKeyLength() {
-        if (hasAttributes()) {
-          return getColumnLength(attrKeyColumnIndex);
-        } else {
-          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
-        }
-      }
-
-      public int getCellVisibilityColumnOffset() {
-        if (hasCellVisibility()) {
-          return getColumnOffset(cellVisibilityColumnIndex);
-        } else {
-          return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-        }
-      }
-
-      public int getCellVisibilityColumnLength() {
-        if (hasCellVisibility()) {
-          return getColumnLength(cellVisibilityColumnIndex);
-        } else {
-          return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-        }
-      }
-
-      public String getCellVisibility() {
-        if (!hasCellVisibility()) {
-          return null;
-        } else {
-          return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
-              getColumnLength(cellVisibilityColumnIndex));
-        }
-      }
-
-      public int getCellTTLColumnOffset() {
-        if (hasCellTTL()) {
-          return getColumnOffset(cellTTLColumnIndex);
-        } else {
-          return DEFAULT_CELL_TTL_COLUMN_INDEX;
-        }
-      }
-
-      public int getCellTTLColumnLength() {
-        if (hasCellTTL()) {
-          return getColumnLength(cellTTLColumnIndex);
-        } else {
-          return DEFAULT_CELL_TTL_COLUMN_INDEX;
-        }
-      }
-
-      public long getCellTTL() {
-        if (!hasCellTTL()) {
-          return 0;
-        } else {
-          return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
-              getColumnLength(cellTTLColumnIndex));
-        }
-      }
-
-      public int getColumnOffset(int idx) {
-        if (idx > 0)
-          return tabOffsets.get(idx - 1) + 1;
-        else
-          return 0;
-      }
-      public int getColumnLength(int idx) {
-        return tabOffsets.get(idx) - getColumnOffset(idx);
-      }
-      public int getColumnCount() {
-        return tabOffsets.size();
-      }
-      public byte[] getLineBytes() {
-        return lineBytes;
-      }
-    }
-
-    public static class BadTsvLineException extends Exception {
-      public BadTsvLineException(String err) {
-        super(err);
-      }
-      private static final long serialVersionUID = 1L;
-    }
-
-    /**
-     * Return starting position and length of row key from the specified line bytes.
-     * @param lineBytes
-     * @param length
-     * @return Pair of row key offset and length.
-     * @throws BadTsvLineException
-     */
-    public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
-        throws BadTsvLineException {
-      int rkColumnIndex = 0;
-      int startPos = 0, endPos = 0;
-      for (int i = 0; i <= length; i++) {
-        if (i == length || lineBytes[i] == separatorByte) {
-          endPos = i - 1;
-          if (rkColumnIndex++ == getRowKeyColumnIndex()) {
-            if ((endPos + 1) == startPos) {
-              throw new BadTsvLineException("Empty value for ROW KEY.");
-            }
-            break;
-          } else {
-            startPos = endPos + 2;
-          }
-        }
-        if (i == length) {
-          throw new BadTsvLineException(
-              "Row key does not exist as number of columns in the line"
-                  + " are less than row key position.");
-        }
-      }
-      return new Pair<>(startPos, endPos - startPos + 1);
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  protected static Job createSubmittableJob(Configuration conf, String[] args)
-      throws IOException, ClassNotFoundException {
-    Job job = null;
-    boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
-    try (Connection connection = ConnectionFactory.createConnection(conf)) {
-      try (Admin admin = connection.getAdmin()) {
-        // Support non-XML supported characters
-        // by re-encoding the passed separator as a Base64 string.
-        String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
-        if (actualSeparator != null) {
-          conf.set(SEPARATOR_CONF_KEY,
-              Base64.encodeBytes(actualSeparator.getBytes()));
-        }
-
-        // See if a non-default Mapper was set
-        String mapperClassName = conf.get(MAPPER_CONF_KEY);
-        Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER;
-
-        TableName tableName = TableName.valueOf(args[0]);
-        Path inputDir = new Path(args[1]);
-        String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
-        job = Job.getInstance(conf, jobName);
-        job.setJarByClass(mapperClass);
-        FileInputFormat.setInputPaths(job, inputDir);
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setMapperClass(mapperClass);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-        String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
-        if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
-          String fileLoc = conf.get(CREDENTIALS_LOCATION);
-          Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
-          job.getCredentials().addAll(cred);
-        }
-
-        if (hfileOutPath != null) {
-          if (!admin.tableExists(tableName)) {
-            LOG.warn(format("Table '%s' does not exist.", tableName));
-            if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
-              // TODO: this is backwards. Instead of depending on the existence of a table,
-              // create a sane splits file for HFileOutputFormat based on data sampling.
-              createTable(admin, tableName, columns);
-              if (isDryRun) {
-                LOG.warn("Dry run: Table will be deleted at end of dry run.");
-                synchronized (ImportTsv.class) {
-                  DRY_RUN_TABLE_CREATED = true;
-                }
-              }
-            } else {
-              String errorMsg =
-                  format("Table '%s' does not exist and '%s' is set to no.", tableName,
-                      CREATE_TABLE_CONF_KEY);
-              LOG.error(errorMsg);
-              throw new TableNotFoundException(errorMsg);
-            }
-          }
-          try (Table table = connection.getTable(tableName);
-              RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
-            boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
-            // if no.strict is false then check column family
-            if(!noStrict) {
-              ArrayList<String> unmatchedFamilies = new ArrayList<>();
-              Set<String> cfSet = getColumnFamilies(columns);
-              TableDescriptor tDesc = table.getDescriptor();
-              for (String cf : cfSet) {
-                if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
-                  unmatchedFamilies.add(cf);
-                }
-              }
-              if(unmatchedFamilies.size() > 0) {
-                ArrayList<String> familyNames = new ArrayList<>();
-                for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
-                  familyNames.add(family.getNameAsString());
-                }
-                String msg =
-                    "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
-                    + " does not match with any of the table " + tableName
-                    + " column families " + familyNames + ".\n"
-                    + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
-                    + "=true.\n";
-                usage(msg);
-                System.exit(-1);
-              }
-            }
-            if (mapperClass.equals(TsvImporterTextMapper.class)) {
-              job.setMapOutputValueClass(Text.class);
-              job.setReducerClass(TextSortReducer.class);
-            } else {
-              job.setMapOutputValueClass(Put.class);
-              job.setCombinerClass(PutCombiner.class);
-              job.setReducerClass(PutSortReducer.class);
-            }
-            if (!isDryRun) {
-              Path outputDir = new Path(hfileOutPath);
-              FileOutputFormat.setOutputPath(job, outputDir);
-              HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
-                  regionLocator);
-            }
-          }
-        } else {
-          if (!admin.tableExists(tableName)) {
-            String errorMsg = format("Table '%s' does not exist.", tableName);
-            LOG.error(errorMsg);
-            throw new TableNotFoundException(errorMsg);
-          }
-          if (mapperClass.equals(TsvImporterTextMapper.class)) {
-            usage(TsvImporterTextMapper.class.toString()
-                + " should not be used for non bulkloading case. use "
-                + TsvImporterMapper.class.toString()
-                + " or custom mapper whose value type is Put.");
-            System.exit(-1);
-          }
-          if (!isDryRun) {
-            // No reducers. Just write straight to table. Call initTableReducerJob
-            // to set up the TableOutputFormat.
-            TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
-          }
-          job.setNumReduceTasks(0);
-        }
-        if (isDryRun) {
-          job.setOutputFormatClass(NullOutputFormat.class);
-          job.getConfiguration().setStrings("io.serializations",
-              job.getConfiguration().get("io.serializations"),
-              MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-              KeyValueSerialization.class.getName());
-        }
-        TableMapReduceUtil.addDependencyJars(job);
-        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
-            org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */);
-      }
-    }
-    return job;
-  }
-
-  private static void createTable(Admin admin, TableName tableName, String[] columns)
-      throws IOException {
-    HTableDescriptor htd = new HTableDescriptor(tableName);
-    Set<String> cfSet = getColumnFamilies(columns);
-    for (String cf : cfSet) {
-      HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
-      htd.addFamily(hcd);
-    }
-    LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
-      tableName, cfSet));
-    admin.createTable(htd);
-  }
-
-  private static void deleteTable(Configuration conf, String[] args) {
-    TableName tableName = TableName.valueOf(args[0]);
-    try (Connection connection = ConnectionFactory.createConnection(conf);
-         Admin admin = connection.getAdmin()) {
-      try {
-        admin.disableTable(tableName);
-      } catch (TableNotEnabledException e) {
-        LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
-      }
-      admin.deleteTable(tableName);
-    } catch (IOException e) {
-      LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
-          e.toString()));
-      return;
-    }
-    LOG.info(format("Dry run: Deleted table '%s'.", tableName));
-  }
-
-  private static Set<String> getColumnFamilies(String[] columns) {
-    Set<String> cfSet = new HashSet<>();
-    for (String aColumn : columns) {
-      if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
-          || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
-          || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
-          || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
-          || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
-        continue;
-      // we are only concerned with the first one (in case this is a cf:cq)
-      cfSet.add(aColumn.split(":", 2)[0]);
-    }
-    return cfSet;
-  }
-
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    String usage =
-      "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
-      "\n" +
-      "Imports the given input directory of TSV data into the specified table.\n" +
-      "\n" +
-      "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
-      "option. This option takes the form of comma-separated column names, where each\n" +
-      "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
-      "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
-      "as the row key for each imported record. You must specify exactly one column\n" +
-      "to be the row key, and you must specify a column name for every column that exists in the\n" +
-      "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
-      " designates that this column should be\n" +
-      "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
-      TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
-      "You must specify at most one column as timestamp key for each imported record.\n" +
-      "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
-      "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
-      "\n" +
-      "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC +
-      " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" +
-      TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " +
-      "as a Cell's Time To Live (TTL) attribute.\n" +
-      TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " +
-      "visibility label expression.\n" +
-      "\n" +
-      TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
-      " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
-      " as the seperator.  Note that more than one OperationAttributes can be specified.\n"+
-      "By default importtsv will load data directly into HBase. To instead generate\n" +
-      "HFiles of data to prepare for a bulk data load, pass the option:\n" +
-      "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
-      "  Note: if you do not use this option, then the target table must already exist in HBase\n" +
-      "\n" +
-      "Other options that may be specified with -D include:\n" +
-      "  -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
-      " table. If table does not exist, it is created but deleted in the end.\n" +
-      "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
-      "  -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
-      "  -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
-      "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
-      "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
-      "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
-      DEFAULT_MAPPER.getName() + "\n" +
-      "  -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
-      "  -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
-      "  Note: if you set this to 'no', then the target table must already exist in HBase\n" +
-      "  -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
-      "Default is false\n\n" +
-      "For performance consider the following options:\n" +
-      "  -Dmapreduce.map.speculative=false\n" +
-      "  -Dmapreduce.reduce.speculative=false";
-
-    System.err.println(usage);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      return -1;
-    }
-
-    // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
-    // perform validation on these additional args. When it's not null, user has provided their
-    // own mapper, thus these validation are not relevant.
-    // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
-    if (null == getConf().get(MAPPER_CONF_KEY)) {
-      // Make sure columns are specified
-      String[] columns = getConf().getStrings(COLUMNS_CONF_KEY);
-      if (columns == null) {
-        usage("No columns specified. Please specify with -D" +
-            COLUMNS_CONF_KEY+"=...");
-        return -1;
-      }
-
-      // Make sure they specify exactly one column as the row key
-      int rowkeysFound = 0;
-      for (String col : columns) {
-        if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
-      }
-      if (rowkeysFound != 1) {
-        usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
-        return -1;
-      }
-
-      // Make sure we have at most one column as the timestamp key
-      int tskeysFound = 0;
-      for (String col : columns) {
-        if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
-          tskeysFound++;
-      }
-      if (tskeysFound > 1) {
-        usage("Must specify at most one column as "
-            + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
-        return -1;
-      }
-
-      int attrKeysFound = 0;
-      for (String col : columns) {
-        if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
-          attrKeysFound++;
-      }
-      if (attrKeysFound > 1) {
-        usage("Must specify at most one column as "
-            + TsvParser.ATTRIBUTES_COLUMN_SPEC);
-        return -1;
-      }
-
-      // Make sure one or more columns are specified excluding rowkey and
-      // timestamp key
-      if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
-        usage("One or more columns in addition to the row key and timestamp(optional) are required");
-        return -1;
-      }
-    }
-
-    // If timestamp option is not specified, use current system time.
-    long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
-
-    // Set it back to replace invalid timestamp (non-numeric) with current
-    // system time
-    getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
-
-    synchronized (ImportTsv.class) {
-      DRY_RUN_TABLE_CREATED = false;
-    }
-    Job job = createSubmittableJob(getConf(), args);
-    boolean success = job.waitForCompletion(true);
-    boolean delete = false;
-    synchronized (ImportTsv.class) {
-      delete = DRY_RUN_TABLE_CREATED;
-    }
-    if (delete) {
-      deleteTable(getConf(), args);
-    }
-    return success ? 0 : 1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args);
-    System.exit(status);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
deleted file mode 100644
index 953df62..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.text.MessageFormat;
-import java.util.Enumeration;
-import java.util.jar.JarFile;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-/**
- * Finds the Jar for a class. If the class is in a directory in the
- * classpath, it creates a Jar on the fly with the contents of the directory
- * and returns the path to that Jar. If a Jar is created, it is created in
- * the system temporary directory.
- *
- * This file was forked from hadoop/common/branches/branch-2@1377176.
- */
-public class JarFinder {
-
-  private static void copyToZipStream(File file, ZipEntry entry,
-                              ZipOutputStream zos) throws IOException {
-    InputStream is = new FileInputStream(file);
-    try {
-      zos.putNextEntry(entry);
-      byte[] arr = new byte[4096];
-      int read = is.read(arr);
-      while (read > -1) {
-        zos.write(arr, 0, read);
-        read = is.read(arr);
-      }
-    } finally {
-      try {
-        is.close();
-      } finally {
-        zos.closeEntry();
-      }
-    }
-  }
-
-  public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
-    throws IOException {
-    Preconditions.checkNotNull(relativePath, "relativePath");
-    Preconditions.checkNotNull(zos, "zos");
-
-    // by JAR spec, if there is a manifest, it must be the first entry in the
-    // ZIP.
-    File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
-    ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
-    if (!manifestFile.exists()) {
-      zos.putNextEntry(manifestEntry);
-      new Manifest().write(new BufferedOutputStream(zos));
-      zos.closeEntry();
-    } else {
-      copyToZipStream(manifestFile, manifestEntry, zos);
-    }
-    zos.closeEntry();
-    zipDir(dir, relativePath, zos, true);
-    zos.close();
-  }
-
-  private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
-                             boolean start) throws IOException {
-    String[] dirList = dir.list();
-    if (dirList == null) {
-      return;
-    }
-    for (String aDirList : dirList) {
-      File f = new File(dir, aDirList);
-      if (!f.isHidden()) {
-        if (f.isDirectory()) {
-          if (!start) {
-            ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
-            zos.putNextEntry(dirEntry);
-            zos.closeEntry();
-          }
-          String filePath = f.getPath();
-          File file = new File(filePath);
-          zipDir(file, relativePath + f.getName() + "/", zos, false);
-        }
-        else {
-          String path = relativePath + f.getName();
-          if (!path.equals(JarFile.MANIFEST_NAME)) {
-            ZipEntry anEntry = new ZipEntry(path);
-            copyToZipStream(f, anEntry, zos);
-          }
-        }
-      }
-    }
-  }
-
-  private static void createJar(File dir, File jarFile) throws IOException {
-    Preconditions.checkNotNull(dir, "dir");
-    Preconditions.checkNotNull(jarFile, "jarFile");
-    File jarDir = jarFile.getParentFile();
-    if (!jarDir.exists()) {
-      if (!jarDir.mkdirs()) {
-        throw new IOException(MessageFormat.format("could not create dir [{0}]",
-                                                   jarDir));
-      }
-    }
-    try (FileOutputStream fos = new FileOutputStream(jarFile);
-         JarOutputStream jos = new JarOutputStream(fos)) {
-      jarDir(dir, "", jos);
-    }
-  }
-
-  /**
-   * Returns the full path to the Jar containing the class. It always return a
-   * JAR.
-   *
-   * @param klass class.
-   *
-   * @return path to the Jar containing the class.
-   */
-  public static String getJar(Class klass) {
-    Preconditions.checkNotNull(klass, "klass");
-    ClassLoader loader = klass.getClassLoader();
-    if (loader != null) {
-      String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
-      try {
-        for (Enumeration itr = loader.getResources(class_file);
-             itr.hasMoreElements(); ) {
-          URL url = (URL) itr.nextElement();
-          String path = url.getPath();
-          if (path.startsWith("file:")) {
-            path = path.substring("file:".length());
-          }
-          path = URLDecoder.decode(path, "UTF-8");
-          if ("jar".equals(url.getProtocol())) {
-            path = URLDecoder.decode(path, "UTF-8");
-            return path.replaceAll("!.*$", "");
-          }
-          else if ("file".equals(url.getProtocol())) {
-            String klassName = klass.getName();
-            klassName = klassName.replace(".", "/") + ".class";
-            path = path.substring(0, path.length() - klassName.length());
-            File baseDir = new File(path);
-            File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
-            testDir = testDir.getAbsoluteFile();
-            if (!testDir.exists()) {
-              testDir.mkdirs();
-            }
-            File tempJar = File.createTempFile("hadoop-", "", testDir);
-            tempJar = new File(tempJar.getAbsolutePath() + ".jar");
-            tempJar.deleteOnExit();
-            createJar(baseDir, tempJar);
-            return tempJar.getAbsolutePath();
-          }
-        }
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
deleted file mode 100644
index 241608b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class KeyValueSerialization implements Serialization<KeyValue> {
-  @Override
-  public boolean accept(Class<?> c) {
-    return KeyValue.class.isAssignableFrom(c);
-  }
-
-  @Override
-  public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
-    return new KeyValueDeserializer();
-  }
-
-  @Override
-  public KeyValueSerializer getSerializer(Class<KeyValue> c) {
-    return new KeyValueSerializer();
-  }
-
-  public static class KeyValueDeserializer implements Deserializer<KeyValue> {
-    private DataInputStream dis;
-
-    @Override
-    public void close() throws IOException {
-      this.dis.close();
-    }
-
-    @Override
-    public KeyValue deserialize(KeyValue ignore) throws IOException {
-      // I can't overwrite the passed in KV, not from a proto kv, not just yet.  TODO
-      return KeyValueUtil.create(this.dis);
-    }
-
-    @Override
-    public void open(InputStream is) throws IOException {
-      this.dis = new DataInputStream(is);
-    }
-  }
-
-  public static class KeyValueSerializer implements Serializer<KeyValue> {
-    private DataOutputStream dos;
-
-    @Override
-    public void close() throws IOException {
-      this.dos.close();
-    }
-
-    @Override
-    public void open(OutputStream os) throws IOException {
-      this.dos = new DataOutputStream(os);
-    }
-
-    @Override
-    public void serialize(KeyValue kv) throws IOException {
-      KeyValueUtil.write(kv, this.dos);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
deleted file mode 100644
index 5c7ace2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.util.TreeSet;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * Emits sorted KeyValues.
- * Reads in all KeyValues from passed Iterator, sorts them, then emits
- * KeyValues in sorted order.  If lots of columns per row, it will use lots of
- * memory sorting.
- * @see HFileOutputFormat2
- */
-@InterfaceAudience.Public
-public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
-      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
-  throws java.io.IOException, InterruptedException {
-    TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
-    for (KeyValue kv: kvs) {
-      try {
-        map.add(kv.clone());
-      } catch (CloneNotSupportedException e) {
-        throw new java.io.IOException(e);
-      }
-    }
-    context.setStatus("Read " + map.getClass());
-    int index = 0;
-    for (KeyValue kv: map) {
-      context.write(row, kv);
-      if (++index % 100 == 0) context.setStatus("Wrote " + index);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
deleted file mode 100644
index d7c7cc0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Create 3 level tree directory, first level is using table name as parent
- * directory and then use family name as child directory, and all related HFiles
- * for one family are under child directory
- * -tableName1
- *     -columnFamilyName1
- *     -columnFamilyName2
- *         -HFiles
- * -tableName2
- *     -columnFamilyName1
- *         -HFiles
- *     -columnFamilyName2
- */
-@InterfaceAudience.Public
-@VisibleForTesting
-public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
-  private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
-
-  /**
-   * Creates a composite key to use as a mapper output key when using
-   * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
-   *
-   * @param tableName Name of the Table - Eg: TableName.getNameAsString()
-   * @param suffix    Usually represents a rowkey when creating a mapper key or column family
-   * @return          byte[] representation of composite key
-   */
-  public static byte[] createCompositeKey(byte[] tableName,
-                                          byte[] suffix) {
-    return combineTableNameSuffix(tableName, suffix);
-  }
-
-  /**
-   * Alternate api which accepts an ImmutableBytesWritable for the suffix
-   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
-   */
-  public static byte[] createCompositeKey(byte[] tableName,
-                                          ImmutableBytesWritable suffix) {
-    return combineTableNameSuffix(tableName, suffix.get());
-  }
-
-  /**
-   * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
-   * suffix
-   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
-   */
-  public static byte[] createCompositeKey(String tableName,
-                                          ImmutableBytesWritable suffix) {
-    return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
-  }
-
-  /**
-   * Analogous to
-   * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)},
-   * this function will configure the requisite number of reducers to write HFiles for multple
-   * tables simultaneously
-   *
-   * @param job                   See {@link org.apache.hadoop.mapreduce.Job}
-   * @param multiTableDescriptors Table descriptor and region locator pairs
-   * @throws IOException
-   */
-  public static void configureIncrementalLoad(Job job, List<TableInfo>
-      multiTableDescriptors)
-      throws IOException {
-    MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
-            MultiTableHFileOutputFormat.class);
-  }
-
-  final private static int validateCompositeKey(byte[] keyBytes) {
-
-    int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
-
-    // Either the separator was not found or a tablename wasn't present or a key wasn't present
-    if (separatorIdx == -1) {
-      throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
-              .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
-    }
-    return separatorIdx;
-  }
-
-  protected static byte[] getTableName(byte[] keyBytes) {
-    int separatorIdx = validateCompositeKey(keyBytes);
-    return Bytes.copy(keyBytes, 0, separatorIdx);
-  }
-
-  protected static byte[] getSuffix(byte[] keyBytes) {
-    int separatorIdx = validateCompositeKey(keyBytes);
-    return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
deleted file mode 100644
index a8e6837..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Convert HBase tabular data from multiple scanners into a format that 
- * is consumable by Map/Reduce.
- *
- * <p>
- * Usage example
- * </p>
- *
- * <pre>
- * List&lt;Scan&gt; scans = new ArrayList&lt;Scan&gt;();
- * 
- * Scan scan1 = new Scan();
- * scan1.setStartRow(firstRow1);
- * scan1.setStopRow(lastRow1);
- * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
- * scans.add(scan1);
- *
- * Scan scan2 = new Scan();
- * scan2.setStartRow(firstRow2);
- * scan2.setStopRow(lastRow2);
- * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
- * scans.add(scan2);
- *
- * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
- *     IntWritable.class, job);
- * </pre>
- */
-@InterfaceAudience.Public
-public class MultiTableInputFormat extends MultiTableInputFormatBase implements
-    Configurable {
-
-  /** Job parameter that specifies the scan list. */
-  public static final String SCANS = "hbase.mapreduce.scans";
-
-  /** The configuration. */
-  private Configuration conf = null;
-
-  /**
-   * Returns the current configuration.
-   *
-   * @return The current configuration.
-   * @see org.apache.hadoop.conf.Configurable#getConf()
-   */
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Sets the configuration. This is used to set the details for the tables to
-   *  be scanned.
-   *
-   * @param configuration The configuration to set.
-   * @see org.apache.hadoop.conf.Configurable#setConf(
-   *        org.apache.hadoop.conf.Configuration)
-   */
-  @Override
-  public void setConf(Configuration configuration) {
-    this.conf = configuration;
-    String[] rawScans = conf.getStrings(SCANS);
-    if (rawScans.length <= 0) {
-      throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
-          + SCANS);
-    }
-    List<Scan> scans = new ArrayList<>();
-
-    for (int i = 0; i < rawScans.length; i++) {
-      try {
-        scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
-      }
-    }
-    this.setScans(scans);
-  }
-}