You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/10/15 07:33:12 UTC
svn commit: r1022825 - in
/hbase/trunk/src/main/java/org/apache/hadoop/hbase: mapreduce/ zookeeper/
Author: stack
Date: Fri Oct 15 05:33:11 2010
New Revision: 1022825
URL: http://svn.apache.org/viewvc?rev=1022825&view=rev
Log:
HBASE-3111 TestTableMapReduce broken up on hudson
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
Changed mentions of 'quorum' to 'ensemble'.
M src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
Minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Removed unused imports and minor formatting.
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Documented what the quorumAddress parameter is for.
Removed an unnecessary looking HBaseConfiguration.addHbaseResources(conf);
(and adjusted documentation of job to say no hbase config set by the
reduce setup method).
M src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
Made this class implment Configurable. Moved creation of table from
RecordWrite constructor based off passed TaskAttemptContext instead
into the new setConf method. Added table and conf data members.
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java Fri Oct 15 05:33:11 2010
@@ -69,11 +69,10 @@ extends TableReducer<Writable, Writable,
* @throws InterruptedException When the job gets interrupted.
*/
@Override
- public void reduce(Writable key, Iterable<Writable> values,
- Context context) throws IOException, InterruptedException {
+ public void reduce(Writable key, Iterable<Writable> values, Context context)
+ throws IOException, InterruptedException {
for(Writable putOrDelete : values) {
context.write(key, putOrDelete);
}
}
-
-}
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Fri Oct 15 05:33:11 2010
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -141,5 +140,4 @@ implements Configurable {
setScan(scan);
}
-
-}
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Fri Oct 15 05:33:11 2010
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +36,6 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
/**
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@@ -189,8 +187,6 @@ extends InputFormat<ImmutableBytesWritab
return true;
}
-
-
/**
* Allows subclasses to get the {@link HTable}.
*/
@@ -235,5 +231,4 @@ extends InputFormat<ImmutableBytesWritab
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
-
-}
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Fri Oct 15 05:33:11 2010
@@ -26,17 +26,15 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
@@ -46,7 +44,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.conf.Configuration;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -64,13 +61,15 @@ public class TableMapReduceUtil {
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
- * @param job The current job to adjust.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
- Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ Class<? extends Writable> outputValueClass, Job job)
+ throws IOException {
job.setInputFormatClass(TableInputFormat.class);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
@@ -148,10 +147,18 @@ public class TableMapReduceUtil {
*
* @param table The output table.
* @param reducer The reducer class to use.
- * @param job The current job to adjust.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
- * @param quorumAddress Distant cluster to write to
+ * @param quorumAddress Distant cluster to write to; default is null for
+ * output to the cluster that is designated in <code>hbase-site.xml</code>.
+ * Set this String to the zookeeper ensemble of an alternate remote cluster
+ * when you would have the reduce write a cluster that is other than the
+ * default; e.g. copying tables between clusters, the source would be
+ * designated by <code>hbase-site.xml</code> and this param would have the
+ * ensemble address of the remote cluster. The format to pass is particular.
+ * Pass <code> <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT></code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @throws IOException When determining the region count fails.
@@ -165,12 +172,14 @@ public class TableMapReduceUtil {
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+ // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
if (quorumAddress.split(":").length == 2) {
conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
} else {
- throw new IOException("Please specify the peer cluster as " +
- HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT);
+ // Not in expected format.
+ throw new IOException("Please specify the peer cluster using the format of " +
+ HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT);
}
}
if (serverClass != null && serverImpl != null) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Fri Oct 15 05:33:11 2010
@@ -23,7 +23,8 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
@@ -34,7 +35,6 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.conf.Configuration;
/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -43,13 +43,22 @@ import org.apache.hadoop.conf.Configurat
*
* @param <KEY> The type of the key. Ignored in this class.
*/
-public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
+public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable>
+implements Configurable {
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
- /** Optional job parameter to specify a peer cluster */
+
+ /**
+ * Optional job parameter to specify a peer cluster.
+ * Used specifying remote cluster when copying between hbase clusters (the
+ * source is picked up from <code>hbase-site.xml</code>).
+ * @see {@link TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)}
+ */
public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+
/** Optional specification of the rs class name of the peer cluster */
public static final String
REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
@@ -57,6 +66,11 @@ public class TableOutputFormat<KEY> exte
public static final String
REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
+ /** The configuration. */
+ private Configuration conf = null;
+
+ private HTable table;
+
/**
* Writes the reducer output to an HBase table.
*
@@ -120,32 +134,7 @@ public class TableOutputFormat<KEY> exte
public RecordWriter<KEY, Writable> getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
- // expecting exactly one path
- Configuration conf = new Configuration(context.getConfiguration());
- String tableName = conf.get(OUTPUT_TABLE);
- String address = conf.get(QUORUM_ADDRESS);
- String serverClass = conf.get(REGION_SERVER_CLASS);
- String serverImpl = conf.get(REGION_SERVER_IMPL);
- HTable table = null;
- try {
- HBaseConfiguration.addHbaseResources(conf);
- if (address != null) {
- // Check is done in TMRU
- String[] parts = address.split(":");
- conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
- }
- if (serverClass != null) {
- conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
- conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
- }
- table = new HTable(conf, tableName);
- } catch(IOException e) {
- LOG.error(e);
- throw e;
- }
- table.setAutoFlush(false);
- return new TableRecordWriter<KEY>(table);
+ return new TableRecordWriter<KEY>(this.table);
}
/**
@@ -178,4 +167,32 @@ public class TableOutputFormat<KEY> exte
return new TableOutputCommitter();
}
-}
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ String tableName = conf.get(OUTPUT_TABLE);
+ String address = conf.get(QUORUM_ADDRESS);
+ String serverClass = conf.get(REGION_SERVER_CLASS);
+ String serverImpl = conf.get(REGION_SERVER_IMPL);
+ try {
+ if (address != null) {
+ // Check is done in TMRU
+ String[] parts = address.split(":");
+ conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
+ }
+ if (serverClass != null) {
+ conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
+ conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+ }
+ this.table = new HTable(conf, tableName);
+ table.setAutoFlush(false);
+ LOG.info("Created table instance for " + tableName);
+ } catch(IOException e) {
+ LOG.error(e);
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1022825&r1=1022824&r2=1022825&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri Oct 15 05:33:11 2010
@@ -61,12 +61,12 @@ public class ZKUtil {
private static final char ZNODE_PATH_SEPARATOR = '/';
/**
- * Creates a new connection to ZooKeeper, pulling settings and quorum config
+ * Creates a new connection to ZooKeeper, pulling settings and ensemble config
* from the specified configuration object using methods from {@link ZKConfig}.
*
* Sets the connection status monitoring watcher to the specified watcher.
*
- * @param conf configuration to pull quorum and other settings from
+ * @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
@@ -74,26 +74,26 @@ public class ZKUtil {
public static ZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException {
Properties properties = ZKConfig.makeZKProps(conf);
- String quorum = ZKConfig.getZKQuorumServersString(properties);
- return connect(conf, quorum, watcher);
+ String ensemble = ZKConfig.getZKQuorumServersString(properties);
+ return connect(conf, ensemble, watcher);
}
- public static ZooKeeper connect(Configuration conf, String quorum,
+ public static ZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher)
throws IOException {
- return connect(conf, quorum, watcher, "");
+ return connect(conf, ensemble, watcher, "");
}
- public static ZooKeeper connect(Configuration conf, String quorum,
+ public static ZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher, final String descriptor)
throws IOException {
- if(quorum == null) {
- throw new IOException("Unable to determine ZooKeeper quorum");
+ if(ensemble == null) {
+ throw new IOException("Unable to determine ZooKeeper ensemble");
}
int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
- LOG.info(descriptor + " opening connection to ZooKeeper with quorum (" +
- quorum + ")");
- return new ZooKeeper(quorum, timeout, watcher);
+ LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" +
+ ensemble + ")");
+ return new ZooKeeper(ensemble, timeout, watcher);
}
//
@@ -164,9 +164,9 @@ public class ZKUtil {
* @return ensemble key with a name (if any)
*/
public static String getZooKeeperClusterKey(Configuration conf, String name) {
- String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
+ String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
"[\\t\\n\\x0B\\f\\r]", ""));
- StringBuilder builder = new StringBuilder(quorum);
+ StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {