You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/02/14 21:15:51 UTC

hbase git commit: HBASE-13028 Cleanup MapReduce InputFormats

Repository: hbase
Updated Branches:
  refs/heads/master 6f904fe4c -> 332515ed3


HBASE-13028 Cleanup MapReduce InputFormats


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/332515ed
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/332515ed
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/332515ed

Branch: refs/heads/master
Commit: 332515ed346e1ffc104ce3bac986bb7030747a03
Parents: 6f904fe
Author: Sean Busbey <bu...@apache.org>
Authored: Fri Feb 13 15:47:11 2015 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Sat Feb 14 14:13:36 2015 -0600

----------------------------------------------------------------------
 .../hadoop/hbase/mapred/TableInputFormat.java   |  18 +-
 .../hbase/mapred/TableInputFormatBase.java      | 186 ++++++++++++++++---
 .../hbase/mapreduce/TableInputFormat.java       |   4 +-
 .../hbase/mapreduce/TableInputFormatBase.java   | 119 +++++++-----
 .../hbase/mapred/TestTableInputFormat.java      |  72 ++++++-
 .../hbase/mapreduce/TestTableInputFormat.java   |  63 +++++--
 6 files changed, 364 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
index 368510f..814daea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -50,6 +49,15 @@ public class TableInputFormat extends TableInputFormatBase implements
   public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
 
   public void configure(JobConf job) {
+    try {
+      initialize(job);
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  protected void initialize(JobConf job) throws IOException {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
     String colArg = job.get(COLUMN_LIST);
     String[] colNames = colArg.split(" ");
@@ -58,12 +66,8 @@ public class TableInputFormat extends TableInputFormatBase implements
       m_cols[i] = Bytes.toBytes(colNames[i]);
     }
     setInputColumns(m_cols);
-    try {
-      Connection connection = ConnectionFactory.createConnection(job);
-      setHTable((HTable) connection.getTable(TableName.valueOf(tableNames[0].getName())));
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-    }
+    Connection connection = ConnectionFactory.createConnection(job);
+    initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
   }
 
   public void validateInput(JobConf job) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
index d98b5f4..b5b79d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -25,6 +26,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
@@ -40,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter;
  * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
  * byte[] of input columns and optionally a {@link Filter}.
  * Subclasses may use other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
  * <p>
  * An example of a subclass:
  * <pre>
- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *   class ExampleTIF extends TableInputFormatBase {
  *
  *     {@literal @}Override
- *     public void configure(JobConf job) {
- *       try {
- *         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
- *           Bytes.toBytes("exampleTable"));
- *         // mandatory
- *         setHTable(exampleTable);
- *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *           Bytes.toBytes("columnB") };
- *         // mandatory
- *         setInputColumns(inputColumns);
- *         // optional, by default we'll get everything for the given columns.
- *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- *         setRowFilter(exampleFilter);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to configure for job.", exception);
- *       }
+ *     protected void initialize(JobConf context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we hand it over in
+ *       // initializeTable.
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ *       initializeTable(connection, tableName);
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       // optional, by default we'll get everything for the given columns.
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ *       setRowFilter(exampleFilter);
  *     }
  *   }
  * </pre>
@@ -74,9 +84,17 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
   private byte [][] inputColumns;
   private HTable table;
+  private Connection connection;
   private TableRecordReader tableRecordReader;
   private Filter rowFilter;
 
+  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
@@ -87,19 +105,63 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter)
   throws IOException {
+    // In case a subclass uses the deprecated approach or calls initializeTable directly
+    if (table == null) {
+      initialize(job);
+    }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
     TableSplit tSplit = (TableSplit) split;
-    TableRecordReader trr = this.tableRecordReader;
     // if no table record reader was provided use default
-    if (trr == null) {
-      trr = new TableRecordReader();
-    }
+    final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
+        this.tableRecordReader;
     trr.setStartRow(tSplit.getStartRow());
     trr.setEndRow(tSplit.getEndRow());
     trr.setHTable(this.table);
     trr.setInputColumns(this.inputColumns);
     trr.setRowFilter(this.rowFilter);
     trr.init();
-    return trr;
+    return new RecordReader<ImmutableBytesWritable, Result>() {
+
+      @Override
+      public void close() throws IOException {
+        trr.close();
+        closeTable();
+      }
+
+      @Override
+      public ImmutableBytesWritable createKey() {
+        return trr.createKey();
+      }
+
+      @Override
+      public Result createValue() {
+        return trr.createValue();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return trr.getPos();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return trr.getProgress();
+      }
+
+      @Override
+      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+        return trr.next(key, value);
+      }
+    };
   }
 
   /**
@@ -123,8 +185,18 @@ implements InputFormat<ImmutableBytesWritable, Result> {
    */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     if (this.table == null) {
-      throw new IOException("No table was provided");
+      initialize(job);
     }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
     byte [][] startKeys = this.table.getStartKeys();
     if (startKeys == null || startKeys.length == 0) {
       throw new IOException("Expecting at least one region");
@@ -152,6 +224,22 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
+   * Allows subclasses to initialize the table information.
+   *
+   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
+   * @param tableName  The {@link TableName} of the table to process.
+   * @throws IOException
+   */
+  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+    if (table != null || connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+          "reference; TableInputFormatBase will not close these old references when done.");
+    }
+    this.table = (HTable) connection.getTable(tableName);
+    this.connection = connection;
+  }
+
+  /**
    * @param inputColumns to be passed in {@link Result} to the map task.
    */
   protected void setInputColumns(byte [][] inputColumns) {
@@ -160,8 +248,20 @@ implements InputFormat<ImmutableBytesWritable, Result> {
 
   /**
    * Allows subclasses to get the {@link HTable}.
+   * @deprecated use {@link #getTable()}
+   */
+  @Deprecated
+  protected HTable getHTable() {
+    return (HTable) getTable();
+  }
+
+  /**
+   * Allows subclasses to get the {@link Table}.
    */
-  protected Table getHTable() {
+  protected Table getTable() {
+    if (table == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
     return this.table;
   }
 
@@ -169,7 +269,9 @@ implements InputFormat<ImmutableBytesWritable, Result> {
    * Allows subclasses to set the {@link HTable}.
    *
    * @param table to get the data from
+   * @deprecated use {@link #initializeTable(Connection,TableName)}
    */
+  @Deprecated
   protected void setHTable(HTable table) {
     this.table = table;
   }
@@ -192,4 +294,40 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   protected void setRowFilter(Filter rowFilter) {
     this.rowFilter = rowFilter;
   }
+
+  /**
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+   * if an initialize call is needed, but this behavior may change in the future. In particular,
+   * it is critical that initializeTable not be called multiple times since this will leak
+   * Connection instances.
+   *
+   */
+  protected void initialize(JobConf job) throws IOException {
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(table, connection);
+    table = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index 8896eb0..bc2537b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -175,7 +175,9 @@ implements Configurable {
   }
 
   @Override
-  protected void initialize() {
+  protected void initialize(JobContext context) throws IOException {
+    // Do we have to worry about mis-matches between the Configuration from setConf and the one
+    // in this context?
     TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
     try {
       initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index adfe493..6c42d7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -64,16 +64,28 @@ import org.apache.hadoop.util.StringUtils;
  * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
  * an {@link Scan} instance that defines the input columns etc. Subclasses may use
  * other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
+ * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
  * <p>
  * An example of a subclass:
  * <pre>
- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
- *
- *     private JobConf job;
+ *   class ExampleTIF extends TableInputFormatBase {
  *
  *     {@literal @}Override
- *     public void configure(JobConf job) {
- *       this.job = job;
+ *     protected void initialize(JobContext context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we hand it over in
+ *       // initializeTable.
+ *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+ *              job.getConfiguration()));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ *       initializeTable(connection, tableName);
  *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
  *         Bytes.toBytes("columnB") };
  *       // optional, by default we'll get everything for the table.
@@ -85,23 +97,6 @@ import org.apache.hadoop.util.StringUtils;
  *       scan.setFilter(exampleFilter);
  *       setScan(scan);
  *     }
- *
- *     {@literal @}Override
- *     protected void initialize() {
- *       if (job == null) {
- *         throw new IllegalStateException("must have already gotten the JobConf before " +
- *             "initialize is called.");
- *       }
- *       try {
- *         Connection connection =
- *            ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *         TableName tableName = TableName.valueOf("exampleTable");
- *         // mandatory
- *         initializeTable(connection, tableName);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to initialize.", exception);
- *       }
- *     }
  *   }
  * </pre>
  */
@@ -122,6 +117,13 @@ extends InputFormat<ImmutableBytesWritable, Result> {
 
   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
+  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
   /** Holds the details for the internal scanner.
    *
    * @see Scan */
@@ -158,14 +160,18 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
       InputSplit split, TaskAttemptContext context)
   throws IOException {
+    // Just in case a subclass is relying on JobConfigurable magic.
     if (table == null) {
-      initialize();
+      initialize(context);
     }
-    if (getTable() == null) {
-      // initialize() must not have been implemented in the subclass.
-      throw new IOException("Cannot create a record reader because of a" +
-          " previous error. Please look at the previous logs lines from" +
-          " the task's full log for more details.");
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
     }
     TableSplit tSplit = (TableSplit) split;
     LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
@@ -230,14 +236,20 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   public List<InputSplit> getSplits(JobContext context) throws IOException {
     boolean closeOnFinish = false;
 
+    // Just in case a subclass is relying on JobConfigurable magic.
     if (table == null) {
-      initialize();
+      initialize(context);
       closeOnFinish = true;
     }
 
-    if (getTable() == null) {
-      // initialize() wasn't implemented, so the table is null.
-      throw new IOException("No table was provided.");
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
     }
 
     try {
@@ -334,6 +346,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
   }
 
+  /**
+   * @deprecated mistakenly made public in 0.98.7. scope will change to package-private
+   */
+  @Deprecated
   public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
     String hostName = this.reverseDNSCacheMap.get(ipAddress);
     if (hostName == null) {
@@ -366,7 +382,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
    *   org.apache.hadoop.mapreduce.JobContext)
    */
-  public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
+  private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
                                                long average) throws IOException {
     List<InputSplit> resultList = new ArrayList<InputSplit>();
     Configuration conf = context.getConfiguration();
@@ -440,6 +456,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * @param isText It determines to use text key mode or binary key mode
    * @return The split point in the region.
    */
+  @InterfaceAudience.Private
   public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
     byte upperLimitByte;
     byte lowerLimitByte;
@@ -519,8 +536,6 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
-   *
-   *
    * Test if the given region is to be included in the InputSplit while splitting
    * the regions of a table.
    * <p>
@@ -547,7 +562,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   /**
    * Allows subclasses to get the {@link HTable}.
    *
-   * @deprecated
+   * @deprecated use {@link #getTable()}
    */
   @Deprecated
   protected HTable getHTable() {
@@ -559,7 +574,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected RegionLocator getRegionLocator() {
     if (regionLocator == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return regionLocator;
   }
@@ -569,7 +584,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected Table getTable() {
     if (table == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return table;
   }
@@ -579,7 +594,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected Admin getAdmin() {
     if (admin == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return admin;
   }
@@ -587,6 +602,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   /**
    * Allows subclasses to set the {@link HTable}.
    *
+   * Will attempt to reuse the underlying Connection for our own needs, including
+   * retreiving an Admin interface to the HBase cluster.
+   *
    * @param table  The table to get the data from.
    * @throws IOException 
    * @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
@@ -623,6 +641,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * @throws IOException 
    */
   protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+    if (table != null || connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+          "reference; TableInputFormatBase will not close these old references when done.");
+    }
     this.table = connection.getTable(tableName);
     this.regionLocator = connection.getRegionLocator(tableName);
     this.admin = connection.getAdmin();
@@ -659,12 +681,21 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
   
   /**
-   * This method will be called when any of the following are referenced, but not yet initialized:
-   * admin, regionLocator, table. Subclasses will have the opportunity to call
-   * {@link #initializeTable(Connection, TableName)}
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
+   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+   * if an initialize call is needed, but this behavior may change in the future. In particular,
+   * it is critical that initializeTable not be called multiple times since this will leak
+   * Connection instances.
+   *
    */
-  protected void initialize() {
-   
+  protected void initialize(JobContext context) throws IOException {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
index 234a2e8..d7dd8ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
@@ -36,6 +36,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
@@ -322,8 +325,30 @@ public class TestTableInputFormat {
     LOG.info("testing use of an InputFormat taht extends InputFormatBase");
     final Table table = createTable(Bytes.toBytes("exampleTable"),
       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleTIF.class);
+  }
+
+  @Test
+  public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "as it was given in 0.98.");
+    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleDeprecatedTIF.class);
+  }
+
+  @Test
+  public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "using JobConfigurable.");
+    final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleJobConfigurableTIF.class);
+  }
+
+  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
     final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
-    job.setInputFormat(ExampleTIF.class);
+    job.setInputFormat(clazz);
     job.setOutputFormat(NullOutputFormat.class);
     job.setMapperClass(ExampleVerifier.class);
     job.setNumReduceTasks(0);
@@ -373,13 +398,13 @@ public class TestTableInputFormat {
 
   }
 
-  public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
 
     @Override
     public void configure(JobConf job) {
       try {
         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
-          Bytes.toBytes("exampleTable"));
+          Bytes.toBytes("exampleDeprecatedTable"));
         // mandatory
         setHTable(exampleTable);
         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
@@ -396,5 +421,46 @@ public class TestTableInputFormat {
 
   }
 
+  public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        initialize(job);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to initialize.", exception);
+      }
+    }
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleJobConfigurableTable");
+    }
+  }
+
+
+  public static class ExampleTIF extends TableInputFormatBase {
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleTable");
+    }
+
+    protected void initialize(JobConf job, String table) throws IOException {
+      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+      TableName tableName = TableName.valueOf(table);
+      // mandatory
+      initializeTable(connection, tableName);
+      byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+        Bytes.toBytes("columnB") };
+      // mandatory
+      setInputColumns(inputColumns);
+      Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+      // optional
+      setRowFilter(exampleFilter);
+    }
+
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
index 2602961..566a642 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.junit.AfterClass;
@@ -343,6 +344,16 @@ public class TestTableInputFormat {
   }
 
   @Test
+  public void testJobConfigurableExtensionOfTableInputFormatBase()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
+        "using JobConfigurable.");
+    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleJobConfigurableTIF.class);
+  }
+
+  @Test
   public void testDeprecatedExtensionOfTableInputFormatBase()
       throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
@@ -422,13 +433,43 @@ public class TestTableInputFormat {
 
   }
 
-  public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
 
-    private JobConf job;
+  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
+      implements JobConfigurable {
 
     @Override
     public void configure(JobConf job) {
-      this.job = job;
+      try {
+        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
+        // mandatory
+        initializeTable(connection, tableName);
+        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+          Bytes.toBytes("columnB") };
+        //optional
+        Scan scan = new Scan();
+        for (byte[] family : inputColumns) {
+          scan.addFamily(family);
+        }
+        Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+        scan.setFilter(exampleFilter);
+        setScan(scan);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to initialize.", exception);
+      }
+    }
+  }
+
+
+  public static class ExampleTIF extends TableInputFormatBase {
+
+    @Override
+    protected void initialize(JobContext job) throws IOException {
+      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+          job.getConfiguration()));
+      TableName tableName = TableName.valueOf("exampleTable");
+      // mandatory
+      initializeTable(connection, tableName);
       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
         Bytes.toBytes("columnB") };
       //optional
@@ -441,22 +482,6 @@ public class TestTableInputFormat {
       setScan(scan);
     }
 
-    @Override
-    protected void initialize() {
-      if (job == null) {
-        throw new IllegalStateException("must have already gotten the JobConf before initialize " +
-            "is called.");
-      }
-      try {
-        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
-        TableName tableName = TableName.valueOf("exampleTable");
-        // mandatory
-        initializeTable(connection, tableName);
-      } catch (IOException exception) {
-        throw new RuntimeException("Failed to initialize.", exception);
-      }
-    }
-
   }
 }