You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/06/30 13:11:37 UTC

svn commit: r552127 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/mapred/ src/test/ src/test/org/apache/hadoop/hbase/

Author: jimk
Date: Sat Jun 30 04:11:32 2007
New Revision: 552127

URL: http://svn.apache.org/viewvc?view=rev&rev=552127
Log:
HADOOP-1519 map/reduce interface for HBase

AbstractMergeTestBase, HBaseTestCase: move createNewHRegion to HBaseTestCase
MiniHBaseCluster: add deleteOnExit, getDFSCluster, fix Javadoc
TestScanner2: moved KeyedData to org.apache.hadoop.hbase.io
TestTableMapReduce: new test case to test map/reduce interface to HBase
hbase-site.xml: change hbase.client.pause from 3 to 5 seconds, hbase.client.retries.number to 5 so that tests will not time out or run out of retries
HClient: moved KeyedData to org.apache.hadoop.hbase.io, fix javadoc, add method getStartKeys
HMaster: moved KeyedData to org.apache.hadoop.hbase.io, remove unused variables, remove extraneous throws clause, 
HRegionInterface, HRegionServer: moved KeyedData to org.apache.hadoop.hbase.io
KeyedData: moved KeyedData to org.apache.hadoop.hbase.io
KeyedDataArrayWritable: new class to support HBase map/reduce
org.apache.hadoop.hbase.mapred: new classes for map/reduce
- GroupingTableMap
- IdentityTableMap
- IdentityTableReduce
- TableInputFormat
- TableMap
- TableOutputCollector
- TableOutputFormat
- TableReduce
- TableSplit
hbase/bin/hbase: changes for map/reduce

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
Removed:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/KeyedData.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Jun 30 04:11:32 2007
@@ -43,3 +43,4 @@
  25. HADOOP-1537. Catch exceptions in testCleanRegionServerExit so we can see
      what is failing.
  26. HADOOP-1543 [hbase] Add HClient.tableExists
+ 27. HADOOP-1519 [hbase] map/reduce interface for HBase

Modified: lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/bin/hbase?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/bin/hbase (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/bin/hbase Sat Jun 30 04:11:32 2007
@@ -82,10 +82,10 @@
 
 # CLASSPATH initially contains $HBASE_CONF_DIR
 # Add HADOOP_CONF_DIR if its been defined.
-CLASSPATH="${HBASE_CONF_DIR}"
 if [ ! "$HADOOP_CONF_DIR" = "" ]; then
     CLASSPATH="${CLASSPATH}:${HADOOP_CONF_DIR}"
 fi
+CLASSPATH="${CLASSPATH}:${HBASE_CONF_DIR}"
 CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
 
 # for developers, add hbase and hadoop classes to CLASSPATH
@@ -112,13 +112,13 @@
 for f in  "$HBASE_HOME/hadoop-hbase-*.jar"; do
   CLASSPATH=${CLASSPATH}:$f;
 done
-for f in  "$HADOOP_HOME/build/contrib/hbase/hadoop-hbase-*.jar"; do
-  CLASSPATH=${CLASSPATH}:$f;
-done
+if [ -f "$HADOOP_HOME/contrib/hadoop-hbase.jar" ]; then
+  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/contrib/hadoop-hbase.jar
+fi
 if [ -d "$HADOOP_HOME/webapps" ]; then
   CLASSPATH=${CLASSPATH}:$HADOOP_HOME
 fi
-for f in $HADOOP_HOME/hadoop-*-core.jar; do
+for f in $HADOOP_HOME/hadoop-*.jar; do
   CLASSPATH=${CLASSPATH}:$f;
 done
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Sat Jun 30 04:11:32 2007
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -78,10 +79,16 @@
         this.regionInfo;
     }
     
+    /**
+     * @return HRegionInfo
+     */
     public HRegionInfo getRegionInfo(){
       return regionInfo;
     }
 
+    /**
+     * @return HServerAddress
+     */
     public HServerAddress getServerAddress(){
       return serverAddress;
     }
@@ -589,6 +596,23 @@
   }
   
   /**
+   * Gets the starting row key for every region in the currently open table
+   * @return Array of region starting row keys
+   */
+  public synchronized Text[] getStartKeys() {
+    if(this.tableServers == null) {
+      throw new IllegalStateException("Must open table first");
+    }
+
+    Text[] keys = new Text[tableServers.size()];
+    int i = 0;
+    for(Text key: tableServers.keySet()){
+      keys[i++] = key;
+    }
+    return keys;
+  }
+  
+  /**
    * Gets the servers of the given table.
    * 
    * @param tableName - the table to be located
@@ -1360,6 +1384,7 @@
     private Text startRow;
     private boolean closed;
     private RegionLocation[] regions;
+    @SuppressWarnings("hiding")
     private int currentRegion;
     private HRegionInterface server;
     private long scannerId;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Jun 30 04:11:32 2007
@@ -37,6 +37,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
@@ -1854,7 +1855,6 @@
               
               try {
                 DataInputBuffer inbuf = new DataInputBuffer();
-                byte[] bytes;
                 while(true) {
                   HRegionInfo info = new HRegionInfo();
                   String serverName = null;
@@ -1978,8 +1978,7 @@
     
     @Override
     protected void processScanItem(String serverName, long startCode,
-        HRegionInfo info)
-    throws IOException {
+        HRegionInfo info) {
       if (isBeingServed(serverName, startCode)) {
         TreeSet<HRegionInfo> regions = servedRegions.get(serverName);
         if (regions == null) {
@@ -2260,6 +2259,7 @@
 
   /** Instantiated to monitor the health of a region server */
   private class ServerExpirer implements LeaseListener {
+    @SuppressWarnings("hiding")
     private String server;
     
     ServerExpirer(String server) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Sat Jun 30 04:11:32 2007
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Jun 30 04:11:32 2007
@@ -33,6 +33,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.RPC;

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * KeyedData is just a data pair.
+ * It includes an HStoreKey and some associated data.
+ ******************************************************************************/
+public class KeyedData implements Writable {
+  HStoreKey key;
+  byte [] data;
+
+  /** Default constructor. Used by Writable interface */
+  public KeyedData() {
+    this.key = new HStoreKey();
+  }
+
+  /**
+   * Create a KeyedData object specifying the parts
+   * @param key HStoreKey
+   * @param data
+   */
+  public KeyedData(HStoreKey key, byte [] data) {
+    this.key = key;
+    this.data = data;
+  }
+
+  /** @return returns the key */
+  public HStoreKey getKey() {
+    return key;
+  }
+
+  /** @return - returns the value */
+  public byte [] getData() {
+    return data;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  public void write(DataOutput out) throws IOException {
+    key.write(out);
+    out.writeInt(this.data.length);
+    out.write(this.data);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  public void readFields(DataInput in) throws IOException {
+    key.readFields(in);
+    this.data = new byte[in.readInt()];
+    in.readFully(this.data);
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wraps an array of KeyedData items as a Writable. The array elements
+ * may be null.
+ */
+public class KeyedDataArrayWritable implements Writable {
+
+  private final static KeyedData NULL_KEYEDDATA = new KeyedData();
+
+  private KeyedData[] m_data;
+
+  /**
+   * Make a record of length 0
+   */
+  public KeyedDataArrayWritable() {
+    m_data = new KeyedData[0];
+  }
+
+  /** @return the array of KeyedData */
+  public KeyedData[] get() {
+    return m_data; 
+  }
+
+  /**
+   * Sets the KeyedData array
+   * 
+   * @param data array of KeyedData
+   */
+  public void set(KeyedData[] data) {
+    if(data == null) {
+      throw new NullPointerException("KeyedData[] cannot be null");
+    }
+    m_data = data;
+  }
+
+  // Writable
+  
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    m_data = new KeyedData[len];
+    for(int i = 0; i < len; i++) {
+      m_data[i] = new KeyedData();
+      m_data[i].readFields(in);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    int len = m_data.length;
+    out.writeInt(len);
+    for(int i = 0; i < len; i++) {
+      if(m_data[i] != null) {
+        m_data[i].write(out);
+      } else {
+        NULL_KEYEDDATA.write(out);
+      }
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,152 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Extract grouping columns from input record
+ */
+public class GroupingTableMap extends TableMap {
+
+  /**
+   * JobConf parameter to specify the columns used to produce the key passed to 
+   * collect from the map phase
+   */
+  public static final String GROUP_COLUMNS =
+    "hbase.mapred.groupingtablemap.columns";
+  
+  private Text[] m_columns;
+
+  /** default constructor */
+  public GroupingTableMap() {
+    super();
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up the
+   * JobConf.
+   *
+   * @param table table to be processed
+   * @param columns space separated list of columns to fetch
+   * @param groupColumns space separated list of columns used to form the key used in collect
+   * @param mapper map class
+   * @param job job configuration object
+   */
+  public static void initJob(String table, String columns, String groupColumns, 
+      Class<? extends TableMap> mapper, JobConf job) {
+    
+    initJob(table, columns, mapper, job);
+    job.set(GROUP_COLUMNS, groupColumns);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.hbase.mapred.TableMap#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
+    m_columns = new Text[cols.length];
+    for(int i = 0; i < cols.length; i++) {
+      m_columns[i] = new Text(cols[i]);
+    }
+  }
+
+  /**
+   * Extract the grouping columns from value to construct a new key.
+   * 
+   * Pass the new key and value to reduce.
+   * If any of the grouping columns are not found in the value, the record is skipped.
+   *
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   */
+  @Override
+  public void map(@SuppressWarnings("unused") HStoreKey key,
+      KeyedDataArrayWritable value, TableOutputCollector output,
+      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+    
+    byte[][] keyVals = extractKeyValues(value);
+    if(keyVals != null) {
+      Text tKey = createGroupKey(keyVals);
+      output.collect(tKey, value);
+    }
+  }
+
+  /**
+   * Extract columns values from the current record. This method returns
+   * null if any of the columns are not found.
+   * 
+   * Override this method if you want to deal with nulls differently.
+   * 
+   * @param r
+   * @return array of byte values
+   */
+  protected byte[][] extractKeyValues(KeyedDataArrayWritable r) {
+    byte[][] keyVals = null;
+    ArrayList<byte[]> foundList = new ArrayList<byte[]>();
+    int numCols = m_columns.length;
+    if(numCols > 0) {
+      KeyedData[] recVals = r.get();
+      boolean found = true;
+      for(int i = 0; i < numCols && found; i++) {
+        found = false;
+        for(int j = 0; j < recVals.length; j++) {
+          if(recVals[j].getKey().getColumn().equals(m_columns[i])) {
+            found = true;
+            byte[] val = recVals[j].getData();
+            foundList.add(val);
+            break;
+          }
+        }
+      }
+      if(foundList.size() == numCols) {
+        keyVals = foundList.toArray(new byte[numCols][]);
+      }
+    }
+    return keyVals;
+  }
+
+  /**
+   * Create a key by concatenating multiple column values. 
+   * Override this function in order to produce different types of keys.
+   * 
+   * @param vals
+   * @return key generated by concatenating multiple column values
+   */
+  protected Text createGroupKey(byte[][] vals) {
+    if(vals == null) {
+      return null;
+    }
+    StringBuilder sb =  new StringBuilder();
+    for(int i = 0; i < vals.length; i++) {
+      if(i > 0) {
+        sb.append(" ");
+      }
+      sb.append(new String(vals[i]));
+    }
+    return new Text(sb.toString());
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Pass the given key and record as-is to reduce
+ */
+public class IdentityTableMap extends TableMap {
+
+  /** constructor */
+  public IdentityTableMap() {
+    super();
+  }
+
+  /**
+   * Pass the key, value to reduce
+   *
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   */
+  @Override
+  public void map(HStoreKey key, KeyedDataArrayWritable value,
+      TableOutputCollector output,
+      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+    
+    Text tKey = key.getRow();
+    output.collect(tKey, value);
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Write to table each key, record pair
+ */
+public class IdentityTableReduce extends TableReduce {
+
+  /** constructor */
+  public IdentityTableReduce() {
+    super();
+  }
+
+  /**
+   * No aggregation, output pairs of (key, record)
+   *
+   * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   */
+  @Override
+  public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
+      TableOutputCollector output,
+      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+    
+    while(values.hasNext()) {
+      KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next();
+      output.collect(key, r);
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.hbase.HClient;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce
+ */
+public class TableInputFormat implements InputFormat, JobConfigurable {
+  static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
+
+  /**
+   * space delimited list of columns 
+   * @see org.apache.hadoop.hbase.HAbstractScanner for column name wildcards
+   */
+  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+  
+  private Text m_tableName;
+  Text[] m_cols;
+  HClient m_client;
+
+  /**
+   * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
+   */
+  class TableRecordReader implements RecordReader {
+    private HScannerInterface m_scanner;
+    private TreeMap<Text, byte[]> m_row; // current buffer
+    private Text m_endRow;
+
+    /**
+     * Constructor
+     * @param startRow (inclusive)
+     * @param endRow (exclusive)
+     * @throws IOException
+     */
+    public TableRecordReader(Text startRow, Text endRow) throws IOException {
+      LOG.debug("start construct");
+      m_row = new TreeMap<Text, byte[]>();
+      m_scanner = m_client.obtainScanner(m_cols, startRow);
+      m_endRow = endRow;
+      LOG.debug("end construct");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapred.RecordReader#close()
+     */
+    public void close() throws IOException {
+      LOG.debug("start close");
+      m_scanner.close();
+      LOG.debug("end close");
+    }
+
+    /**
+     * @return HStoreKey
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public WritableComparable createKey() {
+      return new HStoreKey();
+    }
+
+    /**
+     * @return KeyedDataArrayWritable of KeyedData
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public Writable createValue() {
+      return new KeyedDataArrayWritable();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapred.RecordReader#getPos()
+     */
+    public long getPos() {
+      // This should be the ordinal tuple in the range; 
+      // not clear how to calculate...
+      return 0;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapred.RecordReader#getProgress()
+     */
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key HStoreKey as input key.
+     * @param value KeyedDataArrayWritable as input value
+     * 
+     * Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to
+     *                                (HStoreKey, KeyedDataArrayWritable)
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(Writable key, Writable value) throws IOException {
+      LOG.debug("start next");
+      m_row.clear();
+      HStoreKey tKey = (HStoreKey)key;
+      boolean hasMore = m_scanner.next(tKey, m_row);
+
+      if(hasMore) {
+        if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
+          hasMore = false;
+        } else {
+          KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value;
+          ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
+
+          for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
+            HStoreKey keyCol = new HStoreKey(tKey);
+            keyCol.setColumn(e.getKey());
+            columns.add(new KeyedData(keyCol, e.getValue()));
+          }
+
+          // set the output
+          rowVal.set(columns.toArray(new KeyedData[columns.size()]));
+        }
+      }
+      LOG.debug("end next");
+      return hasMore;
+    }
+
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(org.apache.hadoop.mapred.InputSplit, org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.Reporter)
+   */
+  public RecordReader getRecordReader(InputSplit split,
+      @SuppressWarnings("unused") JobConf job,
+      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+    
+    TableSplit tSplit = (TableSplit)split;
+    return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
+  }
+
+  /**
+   * A split will be created for each HRegion of the input table
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  @SuppressWarnings("unused")
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    LOG.debug("start getSplits");
+
+    Text[] startKeys = m_client.getStartKeys();
+    if(startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    InputSplit[] splits = new InputSplit[startKeys.length];
+    for(int i = 0; i < startKeys.length; i++) {
+      splits[i] = new TableSplit(m_tableName, startKeys[i],
+          ((i + 1) < startKeys.length) ? startKeys[i + 1] : new Text());
+      LOG.debug("split: " + i + "->" + splits[i]);
+    }
+    LOG.debug("end splits");
+    return splits;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    LOG.debug("start configure");
+    Path[] tableNames = job.getInputPaths();
+    m_tableName = new Text(tableNames[0].getName());
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    m_cols = new Text[colNames.length];
+    for(int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = new Text(colNames[i]);
+    }
+    m_client = new HClient(job);
+    try {
+      m_client.openTable(m_tableName);
+    } catch(Exception e) {
+      LOG.error(e);
+    }
+    LOG.debug("end configure");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.InputFormat#validateInput(org.apache.hadoop.mapred.JobConf)
+   */
+  public void validateInput(JobConf job) throws IOException {
+
+    // expecting exactly one path
+    
+    Path[] tableNames = job.getInputPaths();
+    if(tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // expecting at least one column
+    
+    String colArg = job.get(COLUMN_LIST);
+    if(colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,112 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ */
+public abstract class TableMap extends MapReduceBase implements Mapper {
+
+  private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
+
+  private TableOutputCollector m_collector;
+
+  /** constructor*/
+  public TableMap() {
+    m_collector = new TableOutputCollector();
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table table name
+   * @param columns columns to scan
+   * @param mapper mapper class
+   * @param job job configuration
+   */
+  public static void initJob(String table, String columns, 
+      Class<? extends TableMap> mapper, JobConf job) {
+    
+    job.setInputFormat(TableInputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(KeyedDataArrayWritable.class);
+    job.setMapperClass(mapper);
+    job.setInputPath(new Path(table));
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+  }
+
+  /**
+   * Input:
+   * @param key is of type HStoreKey
+   * @param value is of type KeyedDataArrayWritable
+   * @param output output collector
+   * @param reporter object to use for status updates
+   * @throws IOException
+   * 
+   * Output:
+   * The key is a specific column, including the input key or any value
+   * The value is of type LabeledData
+   */
+  public void map(WritableComparable key, Writable value,
+      OutputCollector output, Reporter reporter) throws IOException {
+    
+    LOG.debug("start map");
+    if(m_collector.collector == null) {
+      m_collector.collector = output;
+    }
+    map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter);
+    LOG.debug("end map");
+  }
+
+  /**
+   * Call a user defined function on a single HBase record, represented
+   * by a key and its associated record value.
+   * 
+   * @param key
+   * @param value
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public abstract void map(HStoreKey key, KeyedDataArrayWritable value, 
+      TableOutputCollector output, Reporter reporter) throws IOException;
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+/**
+ * Refine the types that can be collected from a Table Map/Reduce jobs.
+ */
+public class TableOutputCollector {
+  /** The collector object */
+  public OutputCollector collector;
+
+  /**
+   * Restrict Table Map/Reduce's output to be a Text key and a record.
+   * 
+   * @param key
+   * @param value
+   * @throws IOException
+   */
+  public void collect(Text key, KeyedDataArrayWritable value)
+  throws IOException {
+    collector.collect(key, value);
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.hadoop.hbase.HClient;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+public class TableOutputFormat extends OutputFormatBase {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+  static final Logger LOG = Logger.getLogger(TableOutputFormat.class.getName());
+
+  /** constructor */
+  public TableOutputFormat() {}
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 
+   * and write to an HBase table
+   */
+  protected class TableRecordWriter implements RecordWriter {
+    private HClient m_client;
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     * 
+     * @param client
+     */
+    public TableRecordWriter(HClient client) {
+      m_client = client;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapred.RecordWriter#close(org.apache.hadoop.mapred.Reporter)
+     */
+    public void close(@SuppressWarnings("unused") Reporter reporter) {}
+
+    /**
+     * Expect key to be of type Text
+     * Expect value to be of type KeyedDataArrayWritable
+     *
+     * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
+     */
+    public void write(WritableComparable key, Writable value) throws IOException {
+      LOG.debug("start write");
+      Text tKey = (Text)key;
+      KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value;
+      KeyedData[] columns = tValue.get();
+
+      // start transaction
+      
+      long xid = m_client.startUpdate(tKey);
+      
+      for(int i = 0; i < columns.length; i++) {
+        KeyedData column = columns[i];
+        m_client.put(xid, column.getKey().getColumn(), column.getData());
+      }
+      
+      // end transaction
+      
+      m_client.commit(xid);
+
+      LOG.debug("end write");
+    }
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable)
+   */
+  @Override
+  @SuppressWarnings("unused")
+  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+      String name, Progressable progress) throws IOException {
+    
+    // expecting exactly one path
+    
+    LOG.debug("start get writer");
+    Text tableName = new Text(job.get(OUTPUT_TABLE));
+    HClient client = null;
+    try {
+      client = new HClient(job);
+      client.openTable(tableName);
+    } catch(Exception e) {
+      LOG.error(e);
+    }
+    LOG.debug("end get writer");
+    return new TableRecordWriter(client);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.OutputFormatBase#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  @SuppressWarnings("unused")
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+    
+    String tableName = job.get(OUTPUT_TABLE);
+    if(tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+/**
+ * Write a table, sorting by the input key
+ */
+public abstract class TableReduce extends MapReduceBase implements Reducer {
+  private static final Logger LOG =
+    Logger.getLogger(TableReduce.class.getName());
+
+  TableOutputCollector m_collector;
+
+  /** Constructor */
+  public TableReduce() {
+    m_collector = new TableOutputCollector();
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table, Class<? extends TableReduce> reducer,
+      JobConf job) {
+    
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+  }
+
+  /**
+   * Create a unique key for table insertion by appending a local
+   * counter the given key.
+   *
+   * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+   */
+  @SuppressWarnings("unchecked")
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    LOG.debug("start reduce");
+    if(m_collector.collector == null) {
+      m_collector.collector = output;
+    }
+    reduce((Text)key, values, m_collector, reporter);
+    LOG.debug("end reduce");
+  }
+
+  /**
+   * 
+   * @param key
+   * @param values
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public abstract void reduce(Text key, Iterator values, 
+      TableOutputCollector output, Reporter reporter) throws IOException;
+
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+public class TableSplit implements InputSplit {
+  private Text m_tableName;
+  private Text m_startRow;
+  private Text m_endRow;
+
+  /** default constructor */
+  public TableSplit() {
+    m_tableName = new Text();
+    m_startRow = new Text();
+    m_endRow = new Text();
+  }
+
+  /**
+   * Constructor
+   * @param tableName
+   * @param startRow
+   * @param endRow
+   */
+  public TableSplit(Text tableName, Text startRow, Text endRow) {
+    this();
+    m_tableName.set(tableName);
+    m_startRow.set(startRow);
+    m_endRow.set(endRow);
+  }
+
+  /** @return table name */
+  public Text getTableName() {
+    return m_tableName;
+  }
+
+  /** @return starting row key */
+  public Text getStartRow() {
+    return m_startRow;
+  }
+
+  /** @return end row key */
+  public Text getEndRow() {
+    return m_endRow;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.InputSplit#getLength()
+   */
+  public long getLength() {
+    // Not clear how to obtain this... seems to be used only for sorting splits
+    return 0;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.InputSplit#getLocations()
+   */
+  public String[] getLocations() {
+    // Return a random node from the cluster for now
+    return new String[] { };
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  public void readFields(DataInput in) throws IOException {
+    m_tableName.readFields(in);
+    m_startRow.readFields(in);
+    m_endRow.readFields(in);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  public void write(DataOutput out) throws IOException {
+    m_tableName.write(out);
+    m_startRow.write(out);
+    m_endRow.write(out);
+  }
+
+  @Override
+  public String toString() {
+    return m_tableName +"," + m_startRow + "," + m_endRow;
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml Sat Jun 30 04:11:32 2007
@@ -11,13 +11,13 @@
   </property>
   <property>
     <name>hbase.client.pause</name>
-    <value>3000</value>
+    <value>5000</value>
     <description>General client pause value.  Used mostly as value to wait
     before running a retry of a failed get, region lookup, etc.</description>
   </property>
   <property>
     <name>hbase.client.retries.number</name>
-    <value>2</value>
+    <value>5</value>
     <description>Maximum retries.  Used as maximum for all retryable
     operations such as fetching of the root region from root region
     server, getting a cell's value, starting a row update, etc.

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java Sat Jun 30 04:11:32 2007
@@ -19,7 +19,6 @@
 import java.io.UnsupportedEncodingException;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -141,17 +140,4 @@
     return region;
   }
 
-  private HRegion createNewHRegion(FileSystem fs, Path dir,
-      Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
-      Text endKey) throws IOException {
-    
-    HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
-    Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
-    fs.mkdirs(regionDir);
-
-    return new HRegion(dir,
-      new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
-      fs, conf, info, null);
-  }
-  
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Sat Jun 30 04:11:32 2007
@@ -15,10 +15,14 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.IOException;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 
 /**
  * Abstract base class for test cases. Performs all static initialization
@@ -43,4 +47,18 @@
   protected Path getUnitTestdir(String testName) {
     return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
   }
+
+  protected HRegion createNewHRegion(FileSystem fs, Path dir,
+      Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
+      Text endKey) throws IOException {
+    
+    HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
+    Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
+    fs.mkdirs(regionDir);
+
+    return new HRegion(dir,
+      new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
+      fs, conf, info, null);
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Sat Jun 30 04:11:32 2007
@@ -41,7 +41,8 @@
   private Thread masterThread;
   List<HRegionServer> regionServers;
   List<Thread> regionThreads;
-  
+  private boolean deleteOnExit = true;
+
   /**
    * Starts a MiniHBaseCluster on top of a new MiniDFSCluster
    * 
@@ -51,9 +52,23 @@
    */
   public MiniHBaseCluster(Configuration conf, int nRegionNodes)
   throws IOException {
-    this(conf, nRegionNodes, true);
+    this(conf, nRegionNodes, true, true, true);
+  }
+
+  /**
+   * Start a MiniHBaseCluster. Use the native file system unless
+   * miniHdfsFilesystem is set to true.
+   * 
+   * @param conf
+   * @param nRegionNodes
+   * @param miniHdfsFilesystem
+   * @throws IOException
+   */
+  public MiniHBaseCluster(Configuration conf, int nRegionNodes,
+      final boolean miniHdfsFilesystem) throws IOException {
+    this(conf, nRegionNodes, miniHdfsFilesystem, true, true);
   }
-  
+
   /**
    * Starts a MiniHBaseCluster on top of an existing HDFSCluster
    * 
@@ -70,7 +85,7 @@
     this.cluster = dfsCluster;
     init(nRegionNodes);
   }
-  
+
   /**
    * Constructor.
    * @param conf
@@ -78,16 +93,20 @@
    * @param miniHdfsFilesystem If true, set the hbase mini
    * cluster atop a mini hdfs cluster.  Otherwise, use the
    * filesystem configured in <code>conf</code>.
+   * @param format the mini hdfs cluster
+   * @param deleteOnExit clean up mini hdfs files
    * @throws IOException 
    */
   public MiniHBaseCluster(Configuration conf, int nRegionNodes,
-      final boolean miniHdfsFilesystem)
+      final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit) 
   throws IOException {
     this.conf = conf;
-    
+    this.deleteOnExit = deleteOnExit;
+
     if (miniHdfsFilesystem) {
       try {
-        this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
+        this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
+
       } catch(Throwable t) {
         LOG.error("Failed setup of mini dfs cluster", t);
         t.printStackTrace();
@@ -112,7 +131,7 @@
       if(this.conf.get(MASTER_ADDRESS) == null) {
         this.conf.set(MASTER_ADDRESS, "localhost:0");
       }
-      
+
       // Create the master
       this.master = new HMaster(conf);
       this.masterThread = new Thread(this.master, "HMaster");
@@ -120,7 +139,7 @@
       // Start up the master
       LOG.info("Starting HMaster");
       masterThread.start();
-      
+
       // Set the master's port for the HRegionServers
       String address = master.getMasterAddress().toString();
       this.conf.set(MASTER_ADDRESS, address);
@@ -137,15 +156,24 @@
     }
   }
 
+  /**
+   * Get the cluster on which this HBase cluster is running
+   * 
+   * @return MiniDFSCluster
+   */
+  public MiniDFSCluster getDFSCluster() {
+    return cluster;
+  }
+
   private void startRegionServers(final int nRegionNodes)
-      throws IOException {
+  throws IOException {
     this.regionServers = new ArrayList<HRegionServer>(nRegionNodes);
     this.regionThreads = new ArrayList<Thread>(nRegionNodes);    
     for(int i = 0; i < nRegionNodes; i++) {
       startRegionServer();
     }
   }
-  
+
   void startRegionServer() throws IOException {
     HRegionServer hsr = new HRegionServer(this.conf);
     this.regionServers.add(hsr);
@@ -153,7 +181,7 @@
     t.start();
     this.regionThreads.add(t);
   }
-  
+
   /** 
    * @return Returns the rpc address actually used by the master server, because
    * the supplied port is not necessarily the actual port used.
@@ -161,7 +189,7 @@
   public HServerAddress getHMasterAddress() {
     return master.getMasterAddress();
   }
-  
+
   /**
    * Shut down the specified region server cleanly
    * 
@@ -170,15 +198,20 @@
   public void stopRegionServer(int serverNumber) {
     if (serverNumber >= regionServers.size()) {
       throw new ArrayIndexOutOfBoundsException(
-          "serverNumber > number of region servers");
+      "serverNumber > number of region servers");
     }
     this.regionServers.get(serverNumber).stop();
   }
-  
+
+  /**
+   * Wait for the specified region server to stop
+   * 
+   * @param serverNumber
+   */
   public void waitOnRegionServer(int serverNumber) {
     if (serverNumber >= regionServers.size()) {
       throw new ArrayIndexOutOfBoundsException(
-          "serverNumber > number of region servers");
+      "serverNumber > number of region servers");
     }
     try {
       this.regionThreads.get(serverNumber).join();
@@ -186,7 +219,7 @@
       e.printStackTrace();
     }
   }
-  
+
   /**
    * Cause a region server to exit without cleaning up
    * 
@@ -195,11 +228,11 @@
   public void abortRegionServer(int serverNumber) {
     if(serverNumber >= this.regionServers.size()) {
       throw new ArrayIndexOutOfBoundsException(
-          "serverNumber > number of region servers");
+      "serverNumber > number of region servers");
     }
     this.regionServers.get(serverNumber).abort();
   }
-  
+
   /** Shut down the HBase cluster */
   public void shutdown() {
     LOG.info("Shutting down the HBase Cluster");
@@ -218,6 +251,7 @@
     }
     try {
       masterThread.join();
+
     } catch(InterruptedException e) {
       // continue
     }
@@ -227,12 +261,14 @@
       LOG.info("Shutting down Mini DFS cluster");
       cluster.shutdown();
     }
-    
+
     // Delete all DFS files
-    deleteFile(new File(System.getProperty(
-        StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
+    if(deleteOnExit) {
+      deleteFile(new File(System.getProperty(
+          StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
+    }
   }
-  
+
   private void deleteFile(File f) {
     if(f.isDirectory()) {
       File[] children = f.listFiles();
@@ -242,4 +278,4 @@
     }
     f.delete();
   }
-}
\ No newline at end of file
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Sat Jun 30 04:11:32 2007
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.Text;
 
 /**

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableOutputCollector;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+
+/**
+ * Test Map/Reduce job over HBase tables
+ */
+public class TestTableMapReduce extends HBaseTestCase {
+  static final String TABLE_NAME = "test";
+  static final String INPUT_COLUMN = "contents:";
+  static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+  static final String OUTPUT_COLUMN = "text:";
+  static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+  
+  private Random rand;
+  private HTableDescriptor desc;
+
+  private MiniDFSCluster dfsCluster = null;
+  private FileSystem fs;
+  private Path dir;
+  private MiniHBaseCluster hCluster = null;
+  
+  private byte[][] values = {
+      "0123".getBytes(),
+      "abcd".getBytes(),
+      "wxyz".getBytes(),
+      "6789".getBytes()
+  };
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    rand = new Random();
+    desc = new HTableDescriptor("test");
+    desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+    
+    dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
+    fs = dfsCluster.getFileSystem();
+    dir = new Path("/hbase");
+    fs.mkdirs(dir);
+
+    // create the root and meta regions and insert the data region into the meta
+
+    HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null);
+    HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null);
+    HRegion.addRegionToMETA(root, meta);
+
+    HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), null, null);
+    HRegion.addRegionToMETA(meta, region);
+
+    // insert some data into the test table
+
+    for(int i = 0; i < values.length; i++) {
+      long lockid = region.startUpdate(new Text("row_"
+          + String.format("%1$05d", i)));
+
+      region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
+      region.commit(lockid);
+    }
+
+    region.close();
+    region.getLog().closeAndDelete();
+    meta.close();
+    meta.getLog().closeAndDelete();
+    root.close();
+    root.getLog().closeAndDelete();
+  
+    // Start up HBase cluster
+    
+    hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    
+    if(hCluster != null) {
+      hCluster.shutdown();
+    }
+    
+  }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  public static class ProcessContentsMapper extends TableMap {
+
+    /** constructor */
+    public ProcessContentsMapper() {
+      super();
+    }
+
+    /**
+     * Pass the key, and reversed value to reduce
+     *
+     * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+     */
+    @Override
+    public void map(HStoreKey key, KeyedDataArrayWritable value,
+        TableOutputCollector output,
+        @SuppressWarnings("unused") Reporter reporter) throws IOException {
+      
+      Text tKey = key.getRow();
+      KeyedData[] columns = value.get();
+      
+      if(columns.length != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      
+      if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) {
+        throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+            + " but got: " + columns[0].getKey().getColumn());
+      }
+
+      // Get the input column key and change it to the output column key
+      
+      HStoreKey column = columns[0].getKey();
+      column.setColumn(TEXT_OUTPUT_COLUMN);
+      
+      // Get the original value and reverse it
+      
+      String originalValue = new String(columns[0].getData());
+      StringBuilder newValue = new StringBuilder();
+      for(int i = originalValue.length() - 1; i >= 0; i--) {
+        newValue.append(originalValue.charAt(i));
+      }
+      
+      // Now set the value to be collected
+      
+      columns[0] = new KeyedData(column, newValue.toString().getBytes());
+      value.set(columns);
+      
+      output.collect(tKey, value);
+    }
+  }
+
+  /**
+   * Test HBase map/reduce
+   * @throws IOException
+   */
+  @SuppressWarnings("static-access")
+  public void testTableMapReduce() throws IOException {
+    System.out.println("Print table contents before map/reduce");
+    scanTable(conf);
+    
+    @SuppressWarnings("deprecation")
+    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1);
+
+    try {
+      JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf.setJobName("process column contents");
+      jobConf.setNumMapTasks(1);
+      jobConf.setNumReduceTasks(1);
+
+      ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN, 
+          ProcessContentsMapper.class, jobConf);
+
+      IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf);
+
+      JobClient.runJob(jobConf);
+      
+    } finally {
+      mrCluster.shutdown();
+    }
+    
+    System.out.println("Print table contents after map/reduce");
+    scanTable(conf);
+  }
+  
+  private void scanTable(Configuration conf) throws IOException {
+    HClient client = new HClient(conf);
+    client.openTable(new Text(TABLE_NAME));
+    
+    Text[] columns = {
+        TEXT_INPUT_COLUMN,
+        TEXT_OUTPUT_COLUMN
+    };
+    HScannerInterface scanner =
+      client.obtainScanner(columns, HClient.EMPTY_START_ROW);
+    
+    try {
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+      
+      while(scanner.next(key, results)) {
+        System.out.print("row: " + key.getRow());
+        
+        for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+          System.out.print(" column: " + e.getKey() + " value: "
+              + new String(e.getValue()));
+        }
+        System.out.println();
+      }
+      
+    } finally {
+      scanner.close();
+    }
+  }
+}