You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/09/23 07:04:13 UTC
svn commit: r698089 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/mapred/
src/test/org/apache/hadoop/hbase/mapred/
Author: stack
Date: Mon Sep 22 22:04:12 2008
New Revision: 698089
URL: http://svn.apache.org/viewvc?rev=698089&view=rev
Log:
HBASE-885 TableMap and TableReduce should be interfaces
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Sep 22 22:04:12 2008
@@ -1,6 +1,8 @@
HBase Change Log
Release 0.19.0 - Unreleased
INCOMPATIBLE CHANGES
+ HBASE-885 TableMap and TableReduce should be interfaces
+ (DoÄacan Güney via Stack)
BUG FIXES
HBASE-891 HRS.validateValuesLength throws IOE, gets caught in the retries
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Mon Sep 22 22:04:12 2008
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@@ -37,7 +38,9 @@
/**
* Extract grouping columns from input record
*/
-public class GroupingTableMap extends TableMap<ImmutableBytesWritable,RowResult> {
+public class GroupingTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable,RowResult> {
/**
* JobConf parameter to specify the columns used to produce the key passed to
@@ -63,7 +66,8 @@
public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) {
- initJob(table, columns, mapper, ImmutableBytesWritable.class, RowResult.class, job);
+ TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+ ImmutableBytesWritable.class, RowResult.class, job);
job.set(GROUP_COLUMNS, groupColumns);
}
@@ -83,7 +87,6 @@
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
*/
- @Override
public void map(@SuppressWarnings("unused") ImmutableBytesWritable key,
RowResult value, OutputCollector<ImmutableBytesWritable,RowResult> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Mon Sep 22 22:04:12 2008
@@ -24,13 +24,16 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Pass the given key and record as-is to reduce
*/
-public class IdentityTableMap extends TableMap<ImmutableBytesWritable, RowResult> {
+public class IdentityTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable, RowResult> {
/** constructor */
public IdentityTableMap() {
@@ -49,14 +52,14 @@
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) {
- TableMap.initJob(table, columns, mapper, ImmutableBytesWritable.class,
+ TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+ ImmutableBytesWritable.class,
RowResult.class, job);
}
/**
* Pass the key, value to reduce
*/
- @Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<ImmutableBytesWritable,RowResult> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Mon Sep 22 22:04:12 2008
@@ -25,13 +25,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Write to table each key, record pair
*/
-public class IdentityTableReduce extends TableReduce<ImmutableBytesWritable, BatchUpdate> {
+public class IdentityTableReduce
+extends MapReduceBase
+implements TableReduce<ImmutableBytesWritable, BatchUpdate> {
@SuppressWarnings("unused")
private static final Log LOG =
LogFactory.getLog(IdentityTableReduce.class.getName());
@@ -41,7 +44,6 @@
*
* @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
- @Override
public void reduce(ImmutableBytesWritable key, Iterator<BatchUpdate> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
@SuppressWarnings("unused") Reporter reporter)
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java Mon Sep 22 22:04:12 2008
@@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -43,7 +44,9 @@
* Map outputs table rows IF the input row has columns that have content.
* Uses an {@link IdentityReducer}
*/
-public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> implements Tool {
+public class RowCounter
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable, RowResult>, Tool {
/* Name of this 'program'
*/
static final String NAME = "rowcounter";
@@ -53,7 +56,6 @@
new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>());
private static enum Counters {ROWS}
- @Override
public void map(ImmutableBytesWritable row, RowResult value,
OutputCollector<ImmutableBytesWritable, RowResult> output,
@SuppressWarnings("unused") Reporter reporter)
@@ -93,7 +95,7 @@
sb.append(args[i]);
}
// Second argument is the table name.
- TableMap.initJob(args[1], sb.toString(), this.getClass(),
+ TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), this.getClass(),
ImmutableBytesWritable.class, RowResult.class, c);
c.setReducerClass(IdentityReducer.class);
// First arg is the output directory.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Mon Sep 22 22:04:12 2008
@@ -19,18 +19,11 @@
*/
package org.apache.hadoop.hbase.mapred;
-import java.io.IOException;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
/**
* Scan an HBase table to sort by a specified sort column.
@@ -40,42 +33,7 @@
* @param <V> Writable value class
*/
@SuppressWarnings("unchecked")
-public abstract class TableMap<K extends WritableComparable, V extends Writable>
- extends MapReduceBase implements Mapper<ImmutableBytesWritable, RowResult, K, V> {
- /**
- * Use this before submitting a TableMap job. It will
- * appropriately set up the JobConf.
- *
- * @param table table name
- * @param columns columns to scan
- * @param mapper mapper class
- * @param outputKeyClass
- * @param outputValueClass
- * @param job job configuration
- */
- public static void initJob(String table, String columns,
- Class<? extends TableMap> mapper,
- Class<? extends WritableComparable> outputKeyClass,
- Class<? extends Writable> outputValueClass, JobConf job) {
-
- job.setInputFormat(TableInputFormat.class);
- job.setMapOutputValueClass(outputValueClass);
- job.setMapOutputKeyClass(outputKeyClass);
- job.setMapperClass(mapper);
- FileInputFormat.addInputPaths(job, table);
- job.set(TableInputFormat.COLUMN_LIST, columns);
- }
+public interface TableMap<K extends WritableComparable, V extends Writable>
+extends Mapper<ImmutableBytesWritable, RowResult, K, V> {
- /**
- * Call a user defined function on a single HBase record, represented
- * by a key and its associated record value.
- *
- * @param key
- * @param value
- * @param output
- * @param reporter
- * @throws IOException
- */
- public abstract void map(ImmutableBytesWritable key, RowResult value,
- OutputCollector<K, V> output, Reporter reporter) throws IOException;
}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=698089&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Mon Sep 22 22:04:12 2008
@@ -0,0 +1,52 @@
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("unchecked")
+public class TableMapReduceUtil {
+ /**
+ * Use this before submitting a TableMap job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table table name
+ * @param columns columns to scan
+ * @param mapper mapper class
+ * @param outputKeyClass
+ * @param outputValueClass
+ * @param job job configuration
+ */
+ public static void initTableMapJob(String table, String columns,
+ Class<? extends TableMap> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends Writable> outputValueClass, JobConf job) {
+
+ job.setInputFormat(TableInputFormat.class);
+ job.setMapOutputValueClass(outputValueClass);
+ job.setMapOutputKeyClass(outputKeyClass);
+ job.setMapperClass(mapper);
+ FileInputFormat.addInputPaths(job, table);
+ job.set(TableInputFormat.COLUMN_LIST, columns);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initTableReduceJob(String table,
+ Class<? extends TableReduce> reducer, JobConf job) {
+ job.setOutputFormat(TableOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Mon Sep 22 22:04:12 2008
@@ -19,18 +19,11 @@
*/
package org.apache.hadoop.hbase.mapred;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
/**
* Write a table, sorting by the input key
@@ -39,34 +32,7 @@
* @param <V> value class
*/
@SuppressWarnings("unchecked")
-public abstract class TableReduce<K extends WritableComparable, V extends Writable>
- extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
- /**
- * Use this before submitting a TableReduce job. It will
- * appropriately set up the JobConf.
- *
- * @param table
- * @param reducer
- * @param job
- */
- public static void initJob(String table,
- Class<? extends TableReduce> reducer, JobConf job) {
- job.setOutputFormat(TableOutputFormat.class);
- job.setReducerClass(reducer);
- job.set(TableOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(BatchUpdate.class);
- }
+public interface TableReduce<K extends WritableComparable, V extends Writable>
+extends Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
- /**
- *
- * @param key
- * @param values
- * @param output
- * @param reporter
- * @throws IOException
- */
- public abstract void reduce(K key, Iterator<V> values,
- OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
- throws IOException;
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Mon Sep 22 22:04:12 2008
@@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@@ -73,12 +74,13 @@
/**
* Pass the given key and processed record reduce
*/
- public static class ProcessContentsMapper extends TableMap<ImmutableBytesWritable, BatchUpdate> {
+ public static class ProcessContentsMapper
+ extends MapReduceBase
+ implements TableMap<ImmutableBytesWritable, BatchUpdate> {
/**
* Pass the key, and reversed value to reduce
*/
@SuppressWarnings("unchecked")
- @Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
@SuppressWarnings("unused") Reporter reporter)
@@ -127,10 +129,10 @@
jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumReduceTasks(1);
- TableMap.initJob(Bytes.toString(table.getTableName()), INPUT_COLUMN,
- ProcessContentsMapper.class, ImmutableBytesWritable.class,
- BatchUpdate.class, jobConf);
- TableReduce.initJob(Bytes.toString(table.getTableName()),
+ TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
+ INPUT_COLUMN, ProcessContentsMapper.class,
+ ImmutableBytesWritable.class, BatchUpdate.class, jobConf);
+ TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
IdentityTableReduce.class, jobConf);
LOG.info("Started " + table.getTableName());