You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/10/15 07:33:12 UTC

svn commit: r1022825 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: mapreduce/ zookeeper/

Author: stack
Date: Fri Oct 15 05:33:11 2010
New Revision: 1022825

URL: http://svn.apache.org/viewvc?rev=1022825&view=rev
Log:
HBASE-3111 TestTableMapReduce broken up on hudson


M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
  Changed mentions of 'quorum' to 'ensemble'.
M src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
  Minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
  Removed unused imports and minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
  Documented what the quorumAddress parameter is for.
  Removed an unnecessary looking HBaseConfiguration.addHbaseResources(conf);
  (and adjusted documentation of job to say no hbase config set by the
  reduce setup method).
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
  Made this class implment Configurable.  Moved creation of table from
  RecordWrite constructor based off passed TaskAttemptContext instead
  into the new setConf method.  Added table and conf data members.

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java Fri Oct 15 05:33:11 2010
@@ -69,11 +69,10 @@ extends TableReducer<Writable, Writable,
    * @throws InterruptedException When the job gets interrupted.
    */
   @Override
-  public void reduce(Writable key, Iterable<Writable> values,
-      Context context) throws IOException, InterruptedException {
+  public void reduce(Writable key, Iterable<Writable> values, Context context)
+  throws IOException, InterruptedException {
     for(Writable putOrDelete : values) {
       context.write(key, putOrDelete);
     }
   }
-
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Fri Oct 15 05:33:11 2010
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -141,5 +140,4 @@ implements Configurable {
 
     setScan(scan);
   }
-
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Fri Oct 15 05:33:11 2010
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +36,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@@ -189,8 +187,6 @@ extends InputFormat<ImmutableBytesWritab
     return true;
   }
 
-
-
   /**
    * Allows subclasses to get the {@link HTable}.
    */
@@ -235,5 +231,4 @@ extends InputFormat<ImmutableBytesWritab
   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
     this.tableRecordReader = tableRecordReader;
   }
-
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Fri Oct 15 05:33:11 2010
@@ -26,17 +26,15 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLDecoder;
-import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
@@ -46,7 +44,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
@@ -64,13 +61,15 @@ public class TableMapReduceUtil {
    * @param mapper  The mapper class to use.
    * @param outputKeyClass  The class of the output key.
    * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
    * @throws IOException When setting up the details fails.
    */
   public static void initTableMapperJob(String table, Scan scan,
       Class<? extends TableMapper> mapper,
       Class<? extends WritableComparable> outputKeyClass,
-      Class<? extends Writable> outputValueClass, Job job) throws IOException {
+      Class<? extends Writable> outputValueClass, Job job)
+  throws IOException {
     job.setInputFormatClass(TableInputFormat.class);
     if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
     if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
@@ -148,10 +147,18 @@ public class TableMapReduceUtil {
    *
    * @param table  The output table.
    * @param reducer  The reducer class to use.
-   * @param job  The current job to adjust.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
    * default partitioner.
-   * @param quorumAddress Distant cluster to write to
+   * @param quorumAddress Distant cluster to write to; default is null for
+   * output to the cluster that is designated in <code>hbase-site.xml</code>.
+   * Set this String to the zookeeper ensemble of an alternate remote cluster
+   * when you would have the reduce write a cluster that is other than the
+   * default; e.g. copying tables between clusters, the source would be
+   * designated by <code>hbase-site.xml</code> and this param would have the
+   * ensemble address of the remote cluster.  The format to pass is particular.
+   * Pass <code> &lt;hbase.zookeeper.quorum> ':' &lt;ZOOKEEPER_ZNODE_PARENT></code>.
    * @param serverClass redefined hbase.regionserver.class
    * @param serverImpl redefined hbase.regionserver.impl
    * @throws IOException When determining the region count fails.
@@ -165,12 +172,14 @@ public class TableMapReduceUtil {
     job.setOutputFormatClass(TableOutputFormat.class);
     if (reducer != null) job.setReducerClass(reducer);
     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
     if (quorumAddress != null) {
       if (quorumAddress.split(":").length == 2) {
         conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
       } else {
-        throw new IOException("Please specify the peer cluster as " +
-            HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT);
+        // Not in expected format.
+        throw new IOException("Please specify the peer cluster using the format of " +
+          HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT);
       }
     }
     if (serverClass != null && serverImpl != null) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Fri Oct 15 05:33:11 2010
@@ -23,7 +23,8 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
@@ -34,7 +35,6 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -43,13 +43,22 @@ import org.apache.hadoop.conf.Configurat
  *
  * @param <KEY>  The type of the key. Ignored in this class.
  */
-public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
+public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable>
+implements Configurable {
 
   private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
   /** Job parameter that specifies the output table. */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
-  /** Optional job parameter to specify a peer cluster */
+
+  /**
+   * Optional job parameter to specify a peer cluster.
+   * Used specifying remote cluster when copying between hbase clusters (the
+   * source is picked up from <code>hbase-site.xml</code>).
+   * @see {@link TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)}
+   */
   public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+
   /** Optional specification of the rs class name of the peer cluster */
   public static final String
       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
@@ -57,6 +66,11 @@ public class TableOutputFormat<KEY> exte
   public static final String
       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
 
+  /** The configuration. */
+  private Configuration conf = null;
+
+  private HTable table;
+
   /**
    * Writes the reducer output to an HBase table.
    *
@@ -120,32 +134,7 @@ public class TableOutputFormat<KEY> exte
   public RecordWriter<KEY, Writable> getRecordWriter(
     TaskAttemptContext context)
   throws IOException, InterruptedException {
-    // expecting exactly one path
-    Configuration conf = new Configuration(context.getConfiguration());
-    String tableName = conf.get(OUTPUT_TABLE);
-    String address = conf.get(QUORUM_ADDRESS);
-    String serverClass = conf.get(REGION_SERVER_CLASS);
-    String serverImpl = conf.get(REGION_SERVER_IMPL);
-    HTable table = null;
-    try {
-      HBaseConfiguration.addHbaseResources(conf);
-      if (address != null) {
-        // Check is done in TMRU
-        String[] parts = address.split(":");
-        conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
-        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
-      }
-      if (serverClass != null) {
-        conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
-        conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
-      }
-      table = new HTable(conf, tableName);
-    } catch(IOException e) {
-      LOG.error(e);
-      throw e;
-    }
-    table.setAutoFlush(false);
-    return new TableRecordWriter<KEY>(table);
+    return new TableRecordWriter<KEY>(this.table);
   }
 
   /**
@@ -178,4 +167,32 @@ public class TableOutputFormat<KEY> exte
     return new TableOutputCommitter();
   }
 
-}
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    String tableName = conf.get(OUTPUT_TABLE);
+    String address = conf.get(QUORUM_ADDRESS);
+    String serverClass = conf.get(REGION_SERVER_CLASS);
+    String serverImpl = conf.get(REGION_SERVER_IMPL);
+    try {
+      if (address != null) {
+        // Check is done in TMRU
+        String[] parts = address.split(":");
+        conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
+      }
+      if (serverClass != null) {
+        conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
+        conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+      }
+      this.table = new HTable(conf, tableName);
+      table.setAutoFlush(false);
+      LOG.info("Created table instance for "  + tableName);
+    } catch(IOException e) {
+      LOG.error(e);
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri Oct 15 05:33:11 2010
@@ -61,12 +61,12 @@ public class ZKUtil {
   private static final char ZNODE_PATH_SEPARATOR = '/';
 
   /**
-   * Creates a new connection to ZooKeeper, pulling settings and quorum config
+   * Creates a new connection to ZooKeeper, pulling settings and ensemble config
    * from the specified configuration object using methods from {@link ZKConfig}.
    *
    * Sets the connection status monitoring watcher to the specified watcher.
    *
-   * @param conf configuration to pull quorum and other settings from
+   * @param conf configuration to pull ensemble and other settings from
    * @param watcher watcher to monitor connection changes
    * @return connection to zookeeper
    * @throws IOException if unable to connect to zk or config problem
@@ -74,26 +74,26 @@ public class ZKUtil {
   public static ZooKeeper connect(Configuration conf, Watcher watcher)
   throws IOException {
     Properties properties = ZKConfig.makeZKProps(conf);
-    String quorum = ZKConfig.getZKQuorumServersString(properties);
-    return connect(conf, quorum, watcher);
+    String ensemble = ZKConfig.getZKQuorumServersString(properties);
+    return connect(conf, ensemble, watcher);
   }
 
-  public static ZooKeeper connect(Configuration conf, String quorum,
+  public static ZooKeeper connect(Configuration conf, String ensemble,
       Watcher watcher)
   throws IOException {
-    return connect(conf, quorum, watcher, "");
+    return connect(conf, ensemble, watcher, "");
   }
 
-  public static ZooKeeper connect(Configuration conf, String quorum,
+  public static ZooKeeper connect(Configuration conf, String ensemble,
       Watcher watcher, final String descriptor)
   throws IOException {
-    if(quorum == null) {
-      throw new IOException("Unable to determine ZooKeeper quorum");
+    if(ensemble == null) {
+      throw new IOException("Unable to determine ZooKeeper ensemble");
     }
     int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
-    LOG.info(descriptor + " opening connection to ZooKeeper with quorum (" +
-      quorum + ")");
-    return new ZooKeeper(quorum, timeout, watcher);
+    LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" +
+        ensemble + ")");
+    return new ZooKeeper(ensemble, timeout, watcher);
   }
 
   //
@@ -164,9 +164,9 @@ public class ZKUtil {
    * @return ensemble key with a name (if any)
    */
   public static String getZooKeeperClusterKey(Configuration conf, String name) {
-    String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
+    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
         "[\\t\\n\\x0B\\f\\r]", ""));
-    StringBuilder builder = new StringBuilder(quorum);
+    StringBuilder builder = new StringBuilder(ensemble);
     builder.append(":");
     builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     if (name != null && !name.isEmpty()) {