You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/06/30 13:11:37 UTC
svn commit: r552127 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ bin/
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/
src/java/org/apache/hadoop/hbase/mapred/ src/test/
src/test/org/apache/hadoop/hbase/
Author: jimk
Date: Sat Jun 30 04:11:32 2007
New Revision: 552127
URL: http://svn.apache.org/viewvc?view=rev&rev=552127
Log:
HADOOP-1519 map/reduce interface for HBase
AbstractMergeTestBase, HBaseTestCase: move createNewHRegion to HBaseTestCase
MiniHBaseCluster: add deleteOnExit, getDFSCluster, fix Javadoc
TestScanner2: moved KeyedData to org.apache.hadoop.hbase.io
TestTableMapReduce: new test case to test map/reduce interface to HBase
hbase-site.xml: change hbase.client.pause from 3 to 5 seconds, hbase.client.retries.number to 5 so that tests will not time out or run out of retries
HClient: moved KeyedData to org.apache.hadoop.hbase.io, fix javadoc, add method getStartKeys
HMaster: moved KeyedData to org.apache.hadoop.hbase.io, remove unused variables, remove extraneous throws clause,
HRegionInterface, HRegionServer: moved KeyedData to org.apache.hadoop.hbase.io
KeyedData: moved KeyedData to org.apache.hadoop.hbase.io
KeyedDataArrayWritable: new class to support HBase map/reduce
org.apache.hadoop.hbase.mapred: new classes for map/reduce
- GroupingTableMap
- IdentityTableMap
- IdentityTableReduce
- TableInputFormat
- TableMap
- TableOutputCollector
- TableOutputFormat
- TableReduce
- TableSplit
hbase/bin/hbase: changes for map/reduce
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
Removed:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/KeyedData.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Jun 30 04:11:32 2007
@@ -43,3 +43,4 @@
25. HADOOP-1537. Catch exceptions in testCleanRegionServerExit so we can see
what is failing.
26. HADOOP-1543 [hbase] Add HClient.tableExists
+ 27. HADOOP-1519 [hbase] map/reduce interface for HBase
Modified: lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/bin/hbase?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/bin/hbase (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/bin/hbase Sat Jun 30 04:11:32 2007
@@ -82,10 +82,10 @@
# CLASSPATH initially contains $HBASE_CONF_DIR
# Add HADOOP_CONF_DIR if its been defined.
-CLASSPATH="${HBASE_CONF_DIR}"
if [ ! "$HADOOP_CONF_DIR" = "" ]; then
CLASSPATH="${CLASSPATH}:${HADOOP_CONF_DIR}"
fi
+CLASSPATH="${CLASSPATH}:${HBASE_CONF_DIR}"
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
# for developers, add hbase and hadoop classes to CLASSPATH
@@ -112,13 +112,13 @@
for f in "$HBASE_HOME/hadoop-hbase-*.jar"; do
CLASSPATH=${CLASSPATH}:$f;
done
-for f in "$HADOOP_HOME/build/contrib/hbase/hadoop-hbase-*.jar"; do
- CLASSPATH=${CLASSPATH}:$f;
-done
+if [ -f "$HADOOP_HOME/contrib/hadoop-hbase.jar" ]; then
+ CLASSPATH=${CLASSPATH}:$HADOOP_HOME/contrib/hadoop-hbase.jar
+fi
if [ -d "$HADOOP_HOME/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME
fi
-for f in $HADOOP_HOME/hadoop-*-core.jar; do
+for f in $HADOOP_HOME/hadoop-*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Sat Jun 30 04:11:32 2007
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -78,10 +79,16 @@
this.regionInfo;
}
+ /**
+ * @return HRegionInfo
+ */
public HRegionInfo getRegionInfo(){
return regionInfo;
}
+ /**
+ * @return HServerAddress
+ */
public HServerAddress getServerAddress(){
return serverAddress;
}
@@ -589,6 +596,23 @@
}
/**
+ * Gets the starting row key for every region in the currently open table
+ * @return Array of region starting row keys
+ */
+ public synchronized Text[] getStartKeys() {
+ if(this.tableServers == null) {
+ throw new IllegalStateException("Must open table first");
+ }
+
+ Text[] keys = new Text[tableServers.size()];
+ int i = 0;
+ for(Text key: tableServers.keySet()){
+ keys[i++] = key;
+ }
+ return keys;
+ }
+
+ /**
* Gets the servers of the given table.
*
* @param tableName - the table to be located
@@ -1360,6 +1384,7 @@
private Text startRow;
private boolean closed;
private RegionLocation[] regions;
+ @SuppressWarnings("hiding")
private int currentRegion;
private HRegionInterface server;
private long scannerId;
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Jun 30 04:11:32 2007
@@ -37,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@@ -1854,7 +1855,6 @@
try {
DataInputBuffer inbuf = new DataInputBuffer();
- byte[] bytes;
while(true) {
HRegionInfo info = new HRegionInfo();
String serverName = null;
@@ -1978,8 +1978,7 @@
@Override
protected void processScanItem(String serverName, long startCode,
- HRegionInfo info)
- throws IOException {
+ HRegionInfo info) {
if (isBeingServed(serverName, startCode)) {
TreeSet<HRegionInfo> regions = servedRegions.get(serverName);
if (regions == null) {
@@ -2260,6 +2259,7 @@
/** Instantiated to monitor the health of a region server */
private class ServerExpirer implements LeaseListener {
+ @SuppressWarnings("hiding")
private String server;
ServerExpirer(String server) {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Sat Jun 30 04:11:32 2007
@@ -17,6 +17,7 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Jun 30 04:11:32 2007
@@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * KeyedData is just a data pair.
+ * It includes an HStoreKey and some associated data.
+ ******************************************************************************/
+public class KeyedData implements Writable {
+ HStoreKey key;
+ byte [] data;
+
+ /** Default constructor. Used by Writable interface */
+ public KeyedData() {
+ this.key = new HStoreKey();
+ }
+
+ /**
+ * Create a KeyedData object specifying the parts
+ * @param key HStoreKey
+ * @param data
+ */
+ public KeyedData(HStoreKey key, byte [] data) {
+ this.key = key;
+ this.data = data;
+ }
+
+ /** @return returns the key */
+ public HStoreKey getKey() {
+ return key;
+ }
+
+ /** @return - returns the value */
+ public byte [] getData() {
+ return data;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ key.write(out);
+ out.writeInt(this.data.length);
+ out.write(this.data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ key.readFields(in);
+ this.data = new byte[in.readInt()];
+ in.readFully(this.data);
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wraps an array of KeyedData items as a Writable. The array elements
+ * may be null.
+ */
+public class KeyedDataArrayWritable implements Writable {
+
+ private final static KeyedData NULL_KEYEDDATA = new KeyedData();
+
+ private KeyedData[] m_data;
+
+ /**
+ * Make a record of length 0
+ */
+ public KeyedDataArrayWritable() {
+ m_data = new KeyedData[0];
+ }
+
+ /** @return the array of KeyedData */
+ public KeyedData[] get() {
+ return m_data;
+ }
+
+ /**
+ * Sets the KeyedData array
+ *
+ * @param data array of KeyedData
+ */
+ public void set(KeyedData[] data) {
+ if(data == null) {
+ throw new NullPointerException("KeyedData[] cannot be null");
+ }
+ m_data = data;
+ }
+
+ // Writable
+
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ m_data = new KeyedData[len];
+ for(int i = 0; i < len; i++) {
+ m_data[i] = new KeyedData();
+ m_data[i].readFields(in);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ int len = m_data.length;
+ out.writeInt(len);
+ for(int i = 0; i < len; i++) {
+ if(m_data[i] != null) {
+ m_data[i].write(out);
+ } else {
+ NULL_KEYEDDATA.write(out);
+ }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,152 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Extract grouping columns from input record
+ */
+public class GroupingTableMap extends TableMap {
+
+ /**
+ * JobConf parameter to specify the columns used to produce the key passed to
+ * collect from the map phase
+ */
+ public static final String GROUP_COLUMNS =
+ "hbase.mapred.groupingtablemap.columns";
+
+ private Text[] m_columns;
+
+ /** default constructor */
+ public GroupingTableMap() {
+ super();
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up the
+ * JobConf.
+ *
+ * @param table table to be processed
+ * @param columns space separated list of columns to fetch
+ * @param groupColumns space separated list of columns used to form the key used in collect
+ * @param mapper map class
+ * @param job job configuration object
+ */
+ public static void initJob(String table, String columns, String groupColumns,
+ Class<? extends TableMap> mapper, JobConf job) {
+
+ initJob(table, columns, mapper, job);
+ job.set(GROUP_COLUMNS, groupColumns);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.mapred.TableMap#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
+ m_columns = new Text[cols.length];
+ for(int i = 0; i < cols.length; i++) {
+ m_columns[i] = new Text(cols[i]);
+ }
+ }
+
+ /**
+ * Extract the grouping columns from value to construct a new key.
+ *
+ * Pass the new key and value to reduce.
+ * If any of the grouping columns are not found in the value, the record is skipped.
+ *
+ * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @Override
+ public void map(@SuppressWarnings("unused") HStoreKey key,
+ KeyedDataArrayWritable value, TableOutputCollector output,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ byte[][] keyVals = extractKeyValues(value);
+ if(keyVals != null) {
+ Text tKey = createGroupKey(keyVals);
+ output.collect(tKey, value);
+ }
+ }
+
+ /**
+ * Extract columns values from the current record. This method returns
+ * null if any of the columns are not found.
+ *
+ * Override this method if you want to deal with nulls differently.
+ *
+ * @param r
+ * @return array of byte values
+ */
+ protected byte[][] extractKeyValues(KeyedDataArrayWritable r) {
+ byte[][] keyVals = null;
+ ArrayList<byte[]> foundList = new ArrayList<byte[]>();
+ int numCols = m_columns.length;
+ if(numCols > 0) {
+ KeyedData[] recVals = r.get();
+ boolean found = true;
+ for(int i = 0; i < numCols && found; i++) {
+ found = false;
+ for(int j = 0; j < recVals.length; j++) {
+ if(recVals[j].getKey().getColumn().equals(m_columns[i])) {
+ found = true;
+ byte[] val = recVals[j].getData();
+ foundList.add(val);
+ break;
+ }
+ }
+ }
+ if(foundList.size() == numCols) {
+ keyVals = foundList.toArray(new byte[numCols][]);
+ }
+ }
+ return keyVals;
+ }
+
+ /**
+ * Create a key by concatenating multiple column values.
+ * Override this function in order to produce different types of keys.
+ *
+ * @param vals
+ * @return key generated by concatenating multiple column values
+ */
+ protected Text createGroupKey(byte[][] vals) {
+ if(vals == null) {
+ return null;
+ }
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < vals.length; i++) {
+ if(i > 0) {
+ sb.append(" ");
+ }
+ sb.append(new String(vals[i]));
+ }
+ return new Text(sb.toString());
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Pass the given key and record as-is to reduce
+ */
+public class IdentityTableMap extends TableMap {
+
+ /** constructor */
+ public IdentityTableMap() {
+ super();
+ }
+
+ /**
+ * Pass the key, value to reduce
+ *
+ * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @Override
+ public void map(HStoreKey key, KeyedDataArrayWritable value,
+ TableOutputCollector output,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ Text tKey = key.getRow();
+ output.collect(tKey, value);
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Write to table each key, record pair
+ */
+public class IdentityTableReduce extends TableReduce {
+
+ /** constructor */
+ public IdentityTableReduce() {
+ super();
+ }
+
+ /**
+ * No aggregation, output pairs of (key, record)
+ *
+ * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @Override
+ public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
+ TableOutputCollector output,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ while(values.hasNext()) {
+ KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next();
+ output.collect(key, r);
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.hbase.HClient;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce
+ */
+public class TableInputFormat implements InputFormat, JobConfigurable {
+ static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
+
+ /**
+ * space delimited list of columns
+ * @see org.apache.hadoop.hbase.HAbstractScanner for column name wildcards
+ */
+ public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+
+ private Text m_tableName;
+ Text[] m_cols;
+ HClient m_client;
+
+ /**
+ * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
+ */
+ class TableRecordReader implements RecordReader {
+ private HScannerInterface m_scanner;
+ private TreeMap<Text, byte[]> m_row; // current buffer
+ private Text m_endRow;
+
+ /**
+ * Constructor
+ * @param startRow (inclusive)
+ * @param endRow (exclusive)
+ * @throws IOException
+ */
+ public TableRecordReader(Text startRow, Text endRow) throws IOException {
+ LOG.debug("start construct");
+ m_row = new TreeMap<Text, byte[]>();
+ m_scanner = m_client.obtainScanner(m_cols, startRow);
+ m_endRow = endRow;
+ LOG.debug("end construct");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.RecordReader#close()
+ */
+ public void close() throws IOException {
+ LOG.debug("start close");
+ m_scanner.close();
+ LOG.debug("end close");
+ }
+
+ /**
+ * @return HStoreKey
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public WritableComparable createKey() {
+ return new HStoreKey();
+ }
+
+ /**
+ * @return KeyedDataArrayWritable of KeyedData
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public Writable createValue() {
+ return new KeyedDataArrayWritable();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.RecordReader#getPos()
+ */
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.RecordReader#getProgress()
+ */
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key HStoreKey as input key.
+ * @param value KeyedDataArrayWritable as input value
+ *
+ * Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to
+ * (HStoreKey, KeyedDataArrayWritable)
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(Writable key, Writable value) throws IOException {
+ LOG.debug("start next");
+ m_row.clear();
+ HStoreKey tKey = (HStoreKey)key;
+ boolean hasMore = m_scanner.next(tKey, m_row);
+
+ if(hasMore) {
+ if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
+ hasMore = false;
+ } else {
+ KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value;
+ ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
+
+ for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
+ HStoreKey keyCol = new HStoreKey(tKey);
+ keyCol.setColumn(e.getKey());
+ columns.add(new KeyedData(keyCol, e.getValue()));
+ }
+
+ // set the output
+ rowVal.set(columns.toArray(new KeyedData[columns.size()]));
+ }
+ }
+ LOG.debug("end next");
+ return hasMore;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(org.apache.hadoop.mapred.InputSplit, org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.Reporter)
+ */
+ public RecordReader getRecordReader(InputSplit split,
+ @SuppressWarnings("unused") JobConf job,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ TableSplit tSplit = (TableSplit)split;
+ return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
+ }
+
+ /**
+ * A split will be created for each HRegion of the input table
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+ */
+ @SuppressWarnings("unused")
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ LOG.debug("start getSplits");
+
+ Text[] startKeys = m_client.getStartKeys();
+ if(startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region");
+ }
+ InputSplit[] splits = new InputSplit[startKeys.length];
+ for(int i = 0; i < startKeys.length; i++) {
+ splits[i] = new TableSplit(m_tableName, startKeys[i],
+ ((i + 1) < startKeys.length) ? startKeys[i + 1] : new Text());
+ LOG.debug("split: " + i + "->" + splits[i]);
+ }
+ LOG.debug("end splits");
+ return splits;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ LOG.debug("start configure");
+ Path[] tableNames = job.getInputPaths();
+ m_tableName = new Text(tableNames[0].getName());
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ m_cols = new Text[colNames.length];
+ for(int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = new Text(colNames[i]);
+ }
+ m_client = new HClient(job);
+ try {
+ m_client.openTable(m_tableName);
+ } catch(Exception e) {
+ LOG.error(e);
+ }
+ LOG.debug("end configure");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.InputFormat#validateInput(org.apache.hadoop.mapred.JobConf)
+ */
+ public void validateInput(JobConf job) throws IOException {
+
+ // expecting exactly one path
+
+ Path[] tableNames = job.getInputPaths();
+ if(tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+ }
+
+ // expecting at least one column
+
+ String colArg = job.get(COLUMN_LIST);
+ if(colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,112 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ */
+public abstract class TableMap extends MapReduceBase implements Mapper {
+
+ private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
+
+ private TableOutputCollector m_collector;
+
+ /** constructor*/
+ public TableMap() {
+ m_collector = new TableOutputCollector();
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table table name
+ * @param columns columns to scan
+ * @param mapper mapper class
+ * @param job job configuration
+ */
+ public static void initJob(String table, String columns,
+ Class<? extends TableMap> mapper, JobConf job) {
+
+ job.setInputFormat(TableInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(KeyedDataArrayWritable.class);
+ job.setMapperClass(mapper);
+ job.setInputPath(new Path(table));
+ job.set(TableInputFormat.COLUMN_LIST, columns);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ }
+
+ /**
+ * Input:
+ * @param key is of type HStoreKey
+ * @param value is of type KeyedDataArrayWritable
+ * @param output output collector
+ * @param reporter object to use for status updates
+ * @throws IOException
+ *
+ * Output:
+ * The key is a specific column, including the input key or any value
+ * The value is of type LabeledData
+ */
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
+
+ LOG.debug("start map");
+ if(m_collector.collector == null) {
+ m_collector.collector = output;
+ }
+ map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter);
+ LOG.debug("end map");
+ }
+
+ /**
+ * Call a user defined function on a single HBase record, represented
+ * by a key and its associated record value.
+ *
+ * @param key
+ * @param value
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public abstract void map(HStoreKey key, KeyedDataArrayWritable value,
+ TableOutputCollector output, Reporter reporter) throws IOException;
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+/**
+ * Refine the types that can be collected from a Table Map/Reduce jobs.
+ */
+public class TableOutputCollector {
+ /** The collector object */
+ public OutputCollector collector;
+
+ /**
+ * Restrict Table Map/Reduce's output to be a Text key and a record.
+ *
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ public void collect(Text key, KeyedDataArrayWritable value)
+ throws IOException {
+ collector.collect(key, value);
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.hadoop.hbase.HClient;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+public class TableOutputFormat extends OutputFormatBase {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+ static final Logger LOG = Logger.getLogger(TableOutputFormat.class.getName());
+
+ /** constructor */
+ public TableOutputFormat() {}
+
+ /**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table
+ */
+ protected class TableRecordWriter implements RecordWriter {
+ private HClient m_client;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param client
+ */
+ public TableRecordWriter(HClient client) {
+ m_client = client;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.RecordWriter#close(org.apache.hadoop.mapred.Reporter)
+ */
+ public void close(@SuppressWarnings("unused") Reporter reporter) {}
+
+ /**
+ * Expect key to be of type Text
+ * Expect value to be of type KeyedDataArrayWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
+ */
+ public void write(WritableComparable key, Writable value) throws IOException {
+ LOG.debug("start write");
+ Text tKey = (Text)key;
+ KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value;
+ KeyedData[] columns = tValue.get();
+
+ // start transaction
+
+ long xid = m_client.startUpdate(tKey);
+
+ for(int i = 0; i < columns.length; i++) {
+ KeyedData column = columns[i];
+ m_client.put(xid, column.getKey().getColumn(), column.getData());
+ }
+
+ // end transaction
+
+ m_client.commit(xid);
+
+ LOG.debug("end write");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable)
+ */
+ @Override
+ @SuppressWarnings("unused")
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ LOG.debug("start get writer");
+ Text tableName = new Text(job.get(OUTPUT_TABLE));
+ HClient client = null;
+ try {
+ client = new HClient(job);
+ client.openTable(tableName);
+ } catch(Exception e) {
+ LOG.error(e);
+ }
+ LOG.debug("end get writer");
+ return new TableRecordWriter(client);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.OutputFormatBase#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ @SuppressWarnings("unused")
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if(tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+/**
+ * Write a table, sorting by the input key
+ */
+public abstract class TableReduce extends MapReduceBase implements Reducer {
+ private static final Logger LOG =
+ Logger.getLogger(TableReduce.class.getName());
+
+ TableOutputCollector m_collector;
+
+ /** Constructor */
+ public TableReduce() {
+ m_collector = new TableOutputCollector();
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table, Class<? extends TableReduce> reducer,
+ JobConf job) {
+
+ job.setOutputFormat(TableOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(TableOutputFormat.OUTPUT_TABLE, table);
+ }
+
+ /**
+ * Create a unique key for table insertion by appending a local
+ * counter the given key.
+ *
+ * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @SuppressWarnings("unchecked")
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ LOG.debug("start reduce");
+ if(m_collector.collector == null) {
+ m_collector.collector = output;
+ }
+ reduce((Text)key, values, m_collector, reporter);
+ LOG.debug("end reduce");
+ }
+
+ /**
+ *
+ * @param key
+ * @param values
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public abstract void reduce(Text key, Iterator values,
+ TableOutputCollector output, Reporter reporter) throws IOException;
+
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+public class TableSplit implements InputSplit {
+ private Text m_tableName;
+ private Text m_startRow;
+ private Text m_endRow;
+
+ /** default constructor */
+ public TableSplit() {
+ m_tableName = new Text();
+ m_startRow = new Text();
+ m_endRow = new Text();
+ }
+
+ /**
+ * Constructor
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ */
+ public TableSplit(Text tableName, Text startRow, Text endRow) {
+ this();
+ m_tableName.set(tableName);
+ m_startRow.set(startRow);
+ m_endRow.set(endRow);
+ }
+
+ /** @return table name */
+ public Text getTableName() {
+ return m_tableName;
+ }
+
+ /** @return starting row key */
+ public Text getStartRow() {
+ return m_startRow;
+ }
+
+ /** @return end row key */
+ public Text getEndRow() {
+ return m_endRow;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.InputSplit#getLength()
+ */
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting splits
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.InputSplit#getLocations()
+ */
+ public String[] getLocations() {
+ // Return a random node from the cluster for now
+ return new String[] { };
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ m_tableName.readFields(in);
+ m_startRow.readFields(in);
+ m_endRow.readFields(in);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ m_tableName.write(out);
+ m_startRow.write(out);
+ m_endRow.write(out);
+ }
+
+ @Override
+ public String toString() {
+ return m_tableName +"," + m_startRow + "," + m_endRow;
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml Sat Jun 30 04:11:32 2007
@@ -11,13 +11,13 @@
</property>
<property>
<name>hbase.client.pause</name>
- <value>3000</value>
+ <value>5000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
- <value>2</value>
+ <value>5</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java Sat Jun 30 04:11:32 2007
@@ -19,7 +19,6 @@
import java.io.UnsupportedEncodingException;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -141,17 +140,4 @@
return region;
}
- private HRegion createNewHRegion(FileSystem fs, Path dir,
- Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
- Text endKey) throws IOException {
-
- HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
- Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
- fs.mkdirs(regionDir);
-
- return new HRegion(dir,
- new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
- fs, conf, info, null);
- }
-
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Sat Jun 30 04:11:32 2007
@@ -15,10 +15,14 @@
*/
package org.apache.hadoop.hbase;
+import java.io.IOException;
+
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
/**
* Abstract base class for test cases. Performs all static initialization
@@ -43,4 +47,18 @@
protected Path getUnitTestdir(String testName) {
return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
}
+
+ protected HRegion createNewHRegion(FileSystem fs, Path dir,
+ Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
+ Text endKey) throws IOException {
+
+ HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
+ Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
+ fs.mkdirs(regionDir);
+
+ return new HRegion(dir,
+ new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
+ fs, conf, info, null);
+ }
+
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Sat Jun 30 04:11:32 2007
@@ -41,7 +41,8 @@
private Thread masterThread;
List<HRegionServer> regionServers;
List<Thread> regionThreads;
-
+ private boolean deleteOnExit = true;
+
/**
* Starts a MiniHBaseCluster on top of a new MiniDFSCluster
*
@@ -51,9 +52,23 @@
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes)
throws IOException {
- this(conf, nRegionNodes, true);
+ this(conf, nRegionNodes, true, true, true);
+ }
+
+ /**
+ * Start a MiniHBaseCluster. Use the native file system unless
+ * miniHdfsFilesystem is set to true.
+ *
+ * @param conf
+ * @param nRegionNodes
+ * @param miniHdfsFilesystem
+ * @throws IOException
+ */
+ public MiniHBaseCluster(Configuration conf, int nRegionNodes,
+ final boolean miniHdfsFilesystem) throws IOException {
+ this(conf, nRegionNodes, miniHdfsFilesystem, true, true);
}
-
+
/**
* Starts a MiniHBaseCluster on top of an existing HDFSCluster
*
@@ -70,7 +85,7 @@
this.cluster = dfsCluster;
init(nRegionNodes);
}
-
+
/**
* Constructor.
* @param conf
@@ -78,16 +93,20 @@
* @param miniHdfsFilesystem If true, set the hbase mini
* cluster atop a mini hdfs cluster. Otherwise, use the
* filesystem configured in <code>conf</code>.
+ * @param format the mini hdfs cluster
+ * @param deleteOnExit clean up mini hdfs files
* @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
- final boolean miniHdfsFilesystem)
+ final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit)
throws IOException {
this.conf = conf;
-
+ this.deleteOnExit = deleteOnExit;
+
if (miniHdfsFilesystem) {
try {
- this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
+ this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
+
} catch(Throwable t) {
LOG.error("Failed setup of mini dfs cluster", t);
t.printStackTrace();
@@ -112,7 +131,7 @@
if(this.conf.get(MASTER_ADDRESS) == null) {
this.conf.set(MASTER_ADDRESS, "localhost:0");
}
-
+
// Create the master
this.master = new HMaster(conf);
this.masterThread = new Thread(this.master, "HMaster");
@@ -120,7 +139,7 @@
// Start up the master
LOG.info("Starting HMaster");
masterThread.start();
-
+
// Set the master's port for the HRegionServers
String address = master.getMasterAddress().toString();
this.conf.set(MASTER_ADDRESS, address);
@@ -137,15 +156,24 @@
}
}
+ /**
+ * Get the cluster on which this HBase cluster is running
+ *
+ * @return MiniDFSCluster
+ */
+ public MiniDFSCluster getDFSCluster() {
+ return cluster;
+ }
+
private void startRegionServers(final int nRegionNodes)
- throws IOException {
+ throws IOException {
this.regionServers = new ArrayList<HRegionServer>(nRegionNodes);
this.regionThreads = new ArrayList<Thread>(nRegionNodes);
for(int i = 0; i < nRegionNodes; i++) {
startRegionServer();
}
}
-
+
void startRegionServer() throws IOException {
HRegionServer hsr = new HRegionServer(this.conf);
this.regionServers.add(hsr);
@@ -153,7 +181,7 @@
t.start();
this.regionThreads.add(t);
}
-
+
/**
* @return Returns the rpc address actually used by the master server, because
* the supplied port is not necessarily the actual port used.
@@ -161,7 +189,7 @@
public HServerAddress getHMasterAddress() {
return master.getMasterAddress();
}
-
+
/**
* Shut down the specified region server cleanly
*
@@ -170,15 +198,20 @@
public void stopRegionServer(int serverNumber) {
if (serverNumber >= regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
- "serverNumber > number of region servers");
+ "serverNumber > number of region servers");
}
this.regionServers.get(serverNumber).stop();
}
-
+
+ /**
+ * Wait for the specified region server to stop
+ *
+ * @param serverNumber
+ */
public void waitOnRegionServer(int serverNumber) {
if (serverNumber >= regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
- "serverNumber > number of region servers");
+ "serverNumber > number of region servers");
}
try {
this.regionThreads.get(serverNumber).join();
@@ -186,7 +219,7 @@
e.printStackTrace();
}
}
-
+
/**
* Cause a region server to exit without cleaning up
*
@@ -195,11 +228,11 @@
public void abortRegionServer(int serverNumber) {
if(serverNumber >= this.regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
- "serverNumber > number of region servers");
+ "serverNumber > number of region servers");
}
this.regionServers.get(serverNumber).abort();
}
-
+
/** Shut down the HBase cluster */
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");
@@ -218,6 +251,7 @@
}
try {
masterThread.join();
+
} catch(InterruptedException e) {
// continue
}
@@ -227,12 +261,14 @@
LOG.info("Shutting down Mini DFS cluster");
cluster.shutdown();
}
-
+
// Delete all DFS files
- deleteFile(new File(System.getProperty(
- StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
+ if(deleteOnExit) {
+ deleteFile(new File(System.getProperty(
+ StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
+ }
}
-
+
private void deleteFile(File f) {
if(f.isDirectory()) {
File[] children = f.listFiles();
@@ -242,4 +278,4 @@
}
f.delete();
}
-}
\ No newline at end of file
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?view=diff&rev=552127&r1=552126&r2=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Sat Jun 30 04:11:32 2007
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
/**
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java?view=auto&rev=552127
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java Sat Jun 30 04:11:32 2007
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableOutputCollector;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+
+/**
+ * Test Map/Reduce job over HBase tables
+ */
+public class TestTableMapReduce extends HBaseTestCase {
+ static final String TABLE_NAME = "test";
+ static final String INPUT_COLUMN = "contents:";
+ static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+ static final String OUTPUT_COLUMN = "text:";
+ static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+
+ private Random rand;
+ private HTableDescriptor desc;
+
+ private MiniDFSCluster dfsCluster = null;
+ private FileSystem fs;
+ private Path dir;
+ private MiniHBaseCluster hCluster = null;
+
+ private byte[][] values = {
+ "0123".getBytes(),
+ "abcd".getBytes(),
+ "wxyz".getBytes(),
+ "6789".getBytes()
+ };
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ rand = new Random();
+ desc = new HTableDescriptor("test");
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+
+ dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
+ fs = dfsCluster.getFileSystem();
+ dir = new Path("/hbase");
+ fs.mkdirs(dir);
+
+ // create the root and meta regions and insert the data region into the meta
+
+ HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null);
+ HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null);
+ HRegion.addRegionToMETA(root, meta);
+
+ HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), null, null);
+ HRegion.addRegionToMETA(meta, region);
+
+ // insert some data into the test table
+
+ for(int i = 0; i < values.length; i++) {
+ long lockid = region.startUpdate(new Text("row_"
+ + String.format("%1$05d", i)));
+
+ region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
+ region.commit(lockid);
+ }
+
+ region.close();
+ region.getLog().closeAndDelete();
+ meta.close();
+ meta.getLog().closeAndDelete();
+ root.close();
+ root.getLog().closeAndDelete();
+
+ // Start up HBase cluster
+
+ hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if(hCluster != null) {
+ hCluster.shutdown();
+ }
+
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper extends TableMap {
+
+ /** constructor */
+ public ProcessContentsMapper() {
+ super();
+ }
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @Override
+ public void map(HStoreKey key, KeyedDataArrayWritable value,
+ TableOutputCollector output,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ Text tKey = key.getRow();
+ KeyedData[] columns = value.get();
+
+ if(columns.length != 1) {
+ throw new IOException("There should only be one input column");
+ }
+
+ if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) {
+ throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+ + " but got: " + columns[0].getKey().getColumn());
+ }
+
+ // Get the input column key and change it to the output column key
+
+ HStoreKey column = columns[0].getKey();
+ column.setColumn(TEXT_OUTPUT_COLUMN);
+
+ // Get the original value and reverse it
+
+ String originalValue = new String(columns[0].getData());
+ StringBuilder newValue = new StringBuilder();
+ for(int i = originalValue.length() - 1; i >= 0; i--) {
+ newValue.append(originalValue.charAt(i));
+ }
+
+ // Now set the value to be collected
+
+ columns[0] = new KeyedData(column, newValue.toString().getBytes());
+ value.set(columns);
+
+ output.collect(tKey, value);
+ }
+ }
+
+ /**
+ * Test HBase map/reduce
+ * @throws IOException
+ */
+ @SuppressWarnings("static-access")
+ public void testTableMapReduce() throws IOException {
+ System.out.println("Print table contents before map/reduce");
+ scanTable(conf);
+
+ @SuppressWarnings("deprecation")
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1);
+
+ try {
+ JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+ jobConf.setJobName("process column contents");
+ jobConf.setNumMapTasks(1);
+ jobConf.setNumReduceTasks(1);
+
+ ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN,
+ ProcessContentsMapper.class, jobConf);
+
+ IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf);
+
+ JobClient.runJob(jobConf);
+
+ } finally {
+ mrCluster.shutdown();
+ }
+
+ System.out.println("Print table contents after map/reduce");
+ scanTable(conf);
+ }
+
+ private void scanTable(Configuration conf) throws IOException {
+ HClient client = new HClient(conf);
+ client.openTable(new Text(TABLE_NAME));
+
+ Text[] columns = {
+ TEXT_INPUT_COLUMN,
+ TEXT_OUTPUT_COLUMN
+ };
+ HScannerInterface scanner =
+ client.obtainScanner(columns, HClient.EMPTY_START_ROW);
+
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+ while(scanner.next(key, results)) {
+ System.out.print("row: " + key.getRow());
+
+ for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+ System.out.print(" column: " + e.getKey() + " value: "
+ + new String(e.getValue()));
+ }
+ System.out.println();
+ }
+
+ } finally {
+ scanner.close();
+ }
+ }
+}