You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/05/07 21:26:51 UTC
svn commit: r942186 [5/18] - in /hadoop/hbase/trunk: ./
contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/
core/src/main/java/org/apache/hadoop/hbase/
core/src/main/java/org/apache/hadoop/hbase/client/
core/src/main/java/org/apache/h...
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri May 7 19:26:45 2010
@@ -63,28 +63,28 @@ import java.util.concurrent.LinkedBlocki
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
- *
- *
+ *
+ *
* <p>Copied local so can fix HBASE-900.
- *
+ *
* @see HBaseClient
*/
public abstract class HBaseServer {
-
+
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
-
+
// 1 : Introduce ping and server does not throw away RPCs
- // 3 : RPC was refactored in 0.19
+ // 3 : RPC was refactored in 0.19
public static final byte CURRENT_VERSION = 3;
-
+
/**
* How many calls/handler are allowed in the queue.
*/
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
-
+
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@@ -100,13 +100,13 @@ public abstract class HBaseServer {
public static HBaseServer get() {
return SERVER.get();
}
-
+
/** This is set to Call object before Handler invokes an RPC and reset
* after the call returns.
*/
protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
-
- /** Returns the remote side ip address when invoked inside an RPC
+
+ /** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
* @return InetAddress
*/
@@ -126,23 +126,23 @@ public abstract class HBaseServer {
return (addr == null) ? null : addr.getHostAddress();
}
- protected String bindAddress;
+ protected String bindAddress;
protected int port; // port we listen on
private int handlerCount; // number of handler threads
protected Class<? extends Writable> paramClass; // class of call parameters
- protected int maxIdleTime; // the maximum idle time after
+ protected int maxIdleTime; // the maximum idle time after
// which a client may be
// disconnected
protected int thresholdIdleConnections; // the number of idle
- // connections after which we
- // will start cleaning up idle
+ // connections after which we
+ // will start cleaning up idle
// connections
- int maxConnectionsToNuke; // the max number of
+ int maxConnectionsToNuke; // the max number of
// connections to nuke
// during a cleanup
-
+
protected HBaseRpcMetrics rpcMetrics;
-
+
protected Configuration conf;
@SuppressWarnings({"FieldCanBeLocal"})
@@ -165,7 +165,7 @@ public abstract class HBaseServer {
protected HBaseRPCErrorHandler errorHandler = null;
/**
- * A convenience method to bind to a given address and report
+ * A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
* @param socket the socket to bind
* @param address the address to bind to
@@ -174,13 +174,13 @@ public abstract class HBaseServer {
* @throws UnknownHostException if the address isn't a valid host name
* @throws IOException other random errors from bind
*/
- public static void bind(ServerSocket socket, InetSocketAddress address,
+ public static void bind(ServerSocket socket, InetSocketAddress address,
int backlog) throws IOException {
try {
socket.bind(address, backlog);
} catch (BindException e) {
BindException bindException =
- new BindException("Problem binding to " + address + " : " +
+ new BindException("Problem binding to " + address + " : " +
e.getMessage());
bindException.initCause(e);
throw bindException;
@@ -188,7 +188,7 @@ public abstract class HBaseServer {
// If they try to bind to a different host's address, give a better
// error message.
if ("Unresolved address".equals(e.getMessage())) {
- throw new UnknownHostException("Invalid hostname for server: " +
+ throw new UnknownHostException("Invalid hostname for server: " +
address.getHostName());
}
throw e;
@@ -211,7 +211,7 @@ public abstract class HBaseServer {
this.timestamp = System.currentTimeMillis();
this.response = null;
}
-
+
@Override
public String toString() {
return param.toString() + " from " + connection.toString();
@@ -224,17 +224,17 @@ public abstract class HBaseServer {
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
-
+
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private InetSocketAddress address; //the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
//-tion (for idle connections) ran
- private long cleanupInterval = 10000; //the minimum interval between
+ private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-
+
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
@@ -255,7 +255,7 @@ public abstract class HBaseServer {
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
* that will be cleanedup per run. The criteria for cleanup is the time
- * for which the connection was idle. If 'force' is true then all
+ * for which the connection was idle. If 'force' is true then all
* connections will be looked at for the cleanup.
* @param force all connections will be looked at for cleanup
*/
@@ -336,7 +336,7 @@ public abstract class HBaseServer {
}
} else {
// we can run out of memory if we have too many threads
- // log the event and sleep for a minute and give
+ // log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key);
@@ -363,7 +363,7 @@ public abstract class HBaseServer {
selector= null;
acceptChannel= null;
-
+
// clean up all connections
while (!connectionList.isEmpty()) {
closeConnection(connectionList.remove(0));
@@ -385,7 +385,7 @@ public abstract class HBaseServer {
InetSocketAddress getAddress() {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
}
-
+
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
@@ -415,10 +415,10 @@ public abstract class HBaseServer {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
- return;
+ return;
}
c.setLastContact(System.currentTimeMillis());
-
+
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
@@ -429,7 +429,7 @@ public abstract class HBaseServer {
}
if (count < 0) {
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " +
+ LOG.debug(getName() + ": disconnecting client " +
c.getHostAddress() + ". Number of active connections: "+
numConnections);
closeConnection(c);
@@ -438,7 +438,7 @@ public abstract class HBaseServer {
else {
c.setLastContact(System.currentTimeMillis());
}
- }
+ }
synchronized void doStop() {
if (selector != null) {
@@ -459,7 +459,7 @@ public abstract class HBaseServer {
private class Responder extends Thread {
private Selector writeSelector;
private int pending; // connections waiting to register
-
+
final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
@@ -502,7 +502,7 @@ public abstract class HBaseServer {
//
LOG.debug("Checking for old call responses.");
ArrayList<Call> calls;
-
+
// get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
@@ -510,12 +510,12 @@ public abstract class HBaseServer {
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
- if (call != null && key.channel() == call.connection.channel) {
+ if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
-
+
for(Call call : calls) {
doPurge(call, now);
}
@@ -535,7 +535,7 @@ public abstract class HBaseServer {
try { Thread.sleep(60000); } catch (Exception ignored) {}
}
} catch (Exception e) {
- LOG.warn("Exception in Responder " +
+ LOG.warn("Exception in Responder " +
StringUtils.stringifyException(e));
}
}
@@ -568,7 +568,7 @@ public abstract class HBaseServer {
}
//
- // Remove calls that have been pending in the responseQueue
+ // Remove calls that have been pending in the responseQueue
// for a long time.
//
private void doPurge(Call call, long now) {
@@ -635,18 +635,18 @@ public abstract class HBaseServer {
}
} else {
//
- // If we were unable to write the entire response out, then
- // insert in Selector queue.
+ // If we were unable to write the entire response out, then
+ // insert in Selector queue.
//
call.connection.responseQueue.addFirst(call);
-
+
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
-
+
incPending();
try {
- // Wakeup the thread blocked on select, only then can the call
+ // Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
@@ -659,7 +659,7 @@ public abstract class HBaseServer {
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
- call.connection + " Wrote partial " + numBytes +
+ call.connection + " Wrote partial " + numBytes +
" bytes.");
}
}
@@ -717,7 +717,7 @@ public abstract class HBaseServer {
private long lastContact;
private int dataLength;
protected Socket socket;
- // Cache the remote host & port info so that even if the socket is
+ // Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
private String hostAddress;
private int remotePort;
@@ -745,13 +745,13 @@ public abstract class HBaseServer {
socketSendBufferSize);
}
}
- }
+ }
@Override
public String toString() {
- return getHostAddress() + ":" + remotePort;
+ return getHostAddress() + ":" + remotePort;
}
-
+
public String getHostAddress() {
return hostAddress;
}
@@ -768,17 +768,17 @@ public abstract class HBaseServer {
private boolean isIdle() {
return rpcCount == 0;
}
-
+
/* Decrement the outstanding RPC count */
protected void decRpcCount() {
rpcCount--;
}
-
+
/* Increment the outstanding RPC count */
private void incRpcCount() {
rpcCount++;
}
-
+
protected boolean timedOut(long currentTime) {
return isIdle() && currentTime - lastContact > maxIdleTime;
}
@@ -787,14 +787,14 @@ public abstract class HBaseServer {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
- */
+ */
int count;
if (dataLengthBuffer.remaining() > 0) {
- count = channelRead(channel, dataLengthBuffer);
- if (count < 0 || dataLengthBuffer.remaining() > 0)
+ count = channelRead(channel, dataLengthBuffer);
+ if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
-
+
if (!versionRead) {
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
@@ -803,13 +803,13 @@ public abstract class HBaseServer {
return count;
}
int version = versionBuffer.get(0);
-
- dataLengthBuffer.flip();
+
+ dataLengthBuffer.flip();
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
- LOG.warn("Incorrect header or version mismatch from " +
+ LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
- " got version " + version +
+ " got version " + version +
" expected version " + CURRENT_VERSION);
return -1;
}
@@ -817,11 +817,11 @@ public abstract class HBaseServer {
versionRead = true;
continue;
}
-
+
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
-
+
if (dataLength == HBaseClient.PING_CALL_ID) {
dataLengthBuffer.clear();
return 0; //ping message
@@ -829,9 +829,9 @@ public abstract class HBaseServer {
data = ByteBuffer.allocate(dataLength);
incRpcCount(); // Increment the rpc count
}
-
+
count = channelRead(channel, data);
-
+
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
@@ -844,7 +844,7 @@ public abstract class HBaseServer {
headerRead = true;
data = null;
continue;
- }
+ }
return count;
}
}
@@ -858,18 +858,18 @@ public abstract class HBaseServer {
new DataInputStream(new ByteArrayInputStream(data.array()));
ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
}
-
+
private void processData() throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(data.array()));
int id = dis.readInt(); // try to read an id
-
+
if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);
-
+
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
- param.readFields(dis);
-
+ param.readFields(dis);
+
Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybe blocked here
}
@@ -907,11 +907,11 @@ public abstract class HBaseServer {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection);
-
+
String errorClass = null;
String error = null;
Writable value = null;
-
+
CurCall.set(call);
UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
UserGroupInformation.setCurrentUser(call.connection.ticket);
@@ -969,22 +969,22 @@ public abstract class HBaseServer {
}
}
-
+
protected HBaseServer(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
+ Class<? extends Writable> paramClass, int handlerCount,
Configuration conf)
- throws IOException
+ throws IOException
{
this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port));
}
/* Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
- *
+ *
*/
- protected HBaseServer(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
- Configuration conf, String serverName)
+ protected HBaseServer(String bindAddress, int port,
+ Class<? extends Writable> paramClass, int handlerCount,
+ Configuration conf, String serverName)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
@@ -993,14 +993,14 @@ public abstract class HBaseServer {
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
- this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+ this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
-
+
// Start the listener here and let it bind to the port
listener = new Listener();
- this.port = listener.getAddress().getPort();
+ this.port = listener.getAddress().getPort();
this.rpcMetrics = new HBaseRpcMetrics(serverName,
Integer.toString(this.port));
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
@@ -1017,7 +1017,7 @@ public abstract class HBaseServer {
}
connection.close();
}
-
+
/** Sets the socket buffer size used for responding to RPCs.
* @param size send size
*/
@@ -1028,7 +1028,7 @@ public abstract class HBaseServer {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
-
+
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
@@ -1073,11 +1073,11 @@ public abstract class HBaseServer {
public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress();
}
-
- /** Called for each call.
+
+ /** Called for each call.
* @param param writable parameter
* @param receiveTime time
- * @return Writable
+ * @return Writable
* @throws IOException e
*/
public abstract Writable call(Writable param, long receiveTime)
@@ -1090,7 +1090,7 @@ public abstract class HBaseServer {
public int getNumOpenConnections() {
return numConnections;
}
-
+
/**
* The number of rpc calls in the queue.
* @return The number of rpc calls in the queue.
@@ -1105,22 +1105,22 @@ public abstract class HBaseServer {
*/
public void setErrorHandler(HBaseRPCErrorHandler handler) {
this.errorHandler = handler;
- }
+ }
/**
- * When the read or write buffer size is larger than this limit, i/o will be
+ * When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
* be smaller.
*/
private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
-
+
/**
* This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
- * If the amount of data is large, it writes to channel in smaller chunks.
- * This is to avoid jdk from creating many direct buffers as the size of
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
* buffer increases. This also minimizes extra copies in NIO layer
- * as a result of multiple write operations required to write a large
- * buffer.
+ * as a result of multiple write operations required to write a large
+ * buffer.
*
* @param channel writable byte channel to write to
* @param buffer buffer to write
@@ -1128,7 +1128,7 @@ public abstract class HBaseServer {
* @throws java.io.IOException e
* @see WritableByteChannel#write(ByteBuffer)
*/
- protected static int channelWrite(WritableByteChannel channel,
+ protected static int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
@@ -1136,17 +1136,17 @@ public abstract class HBaseServer {
/**
* This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
- * If the amount of data is large, it writes to channel in smaller chunks.
- * This is to avoid jdk from creating many direct buffers as the size of
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
* ByteBuffer increases. There should not be any performance degredation.
- *
+ *
* @param channel writable byte channel to write on
* @param buffer buffer to write
* @return number of bytes written
* @throws java.io.IOException e
* @see ReadableByteChannel#read(ByteBuffer)
*/
- protected static int channelRead(ReadableByteChannel channel,
+ protected static int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
@@ -1156,7 +1156,7 @@ public abstract class HBaseServer {
* Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
* and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
* one of readCh or writeCh should be non-null.
- *
+ *
* @param readCh read channel
* @param writeCh write channel
* @param buf buffer to read or write into/out of
@@ -1165,31 +1165,31 @@ public abstract class HBaseServer {
* @see #channelRead(ReadableByteChannel, ByteBuffer)
* @see #channelWrite(WritableByteChannel, ByteBuffer)
*/
- private static int channelIO(ReadableByteChannel readCh,
+ private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
-
+
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
-
+
while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
-
- ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
-
+
+ ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
+
if (ret < ioSize) {
break;
}
} finally {
- buf.limit(originalLimit);
+ buf.limit(originalLimit);
}
}
- int nBytes = initialRemaining - buf.remaining();
+ int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
- }
+ }
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Fri May 7 19:26:45 2010
@@ -31,16 +31,16 @@ import java.io.IOException;
* Clients interact with the HMasterInterface to gain access to meta-level
* HBase functionality, like finding an HRegionServer and creating/destroying
* tables.
- *
+ *
* <p>NOTE: if you change the interface, you must change the RPC version
* number in HBaseRPCProtocolVersion
- *
+ *
*/
public interface HMasterInterface extends HBaseRPCProtocolVersion {
/** @return true if master is available */
public boolean isMasterRunning();
-
+
// Admin tools would use these cmds
/**
@@ -60,7 +60,7 @@ public interface HMasterInterface extend
* @throws IOException e
*/
public void deleteTable(final byte [] tableName) throws IOException;
-
+
/**
* Adds a column to the specified table
* @param tableName table to modify
@@ -77,8 +77,8 @@ public interface HMasterInterface extend
* @param descriptor new column descriptor
* @throws IOException e
*/
- public void modifyColumn(final byte [] tableName, final byte [] columnName,
- HColumnDescriptor descriptor)
+ public void modifyColumn(final byte [] tableName, final byte [] columnName,
+ HColumnDescriptor descriptor)
throws IOException;
@@ -90,17 +90,17 @@ public interface HMasterInterface extend
*/
public void deleteColumn(final byte [] tableName, final byte [] columnName)
throws IOException;
-
+
/**
* Puts the table on-line (only needed if table has been previously taken offline)
* @param tableName table to enable
* @throws IOException e
*/
public void enableTable(final byte [] tableName) throws IOException;
-
+
/**
* Take table offline
- *
+ *
* @param tableName table to take offline
* @throws IOException e
*/
@@ -108,7 +108,7 @@ public interface HMasterInterface extend
/**
* Modify a table's metadata
- *
+ *
* @param tableName table to modify
* @param op the operation to do
* @param args arguments for operation
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Fri May 7 19:26:45 2010
@@ -27,13 +27,13 @@ import org.apache.hadoop.io.MapWritable;
import java.io.IOException;
/**
- * HRegionServers interact with the HMasterRegionInterface to report on local
+ * HRegionServers interact with the HMasterRegionInterface to report on local
* goings-on and to obtain data-handling instructions from the HMaster.
* <p>Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker.
- *
+ *
* <p>NOTE: if you change the interface, you must change the RPC version
* number in HBaseRPCProtocolVersion
- *
+ *
*/
public interface HMasterRegionInterface extends HBaseRPCProtocolVersion {
@@ -49,16 +49,16 @@ public interface HMasterRegionInterface
/**
* Called to renew lease, tell master what the region server is doing and to
* receive new instructions from the master
- *
+ *
* @param info server's address and start code
* @param msgs things the region server wants to tell the master
- * @param mostLoadedRegions Array of HRegionInfos that should contain the
+ * @param mostLoadedRegions Array of HRegionInfos that should contain the
* reporting server's most loaded regions. These are candidates for being
* rebalanced.
* @return instructions from the master to the region server
* @throws IOException e
*/
- public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
+ public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
HRegionInfo mostLoadedRegions[])
throws IOException;
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri May 7 19:26:45 2010
@@ -35,26 +35,26 @@ import java.io.IOException;
/**
* Clients interact with HRegionServers using a handle to the HRegionInterface.
- *
+ *
* <p>NOTE: if you change the interface, you must change the RPC version
* number in HBaseRPCProtocolVersion
*/
public interface HRegionInterface extends HBaseRPCProtocolVersion {
- /**
+ /**
* Get metainfo about an HRegion
- *
+ *
* @param regionName name of the region
* @return HRegionInfo object for region
* @throws NotServingRegionException e
*/
public HRegionInfo getRegionInfo(final byte [] regionName)
throws NotServingRegionException;
-
+
/**
- * Return all the data for the row that matches <i>row</i> exactly,
+ * Return all the data for the row that matches <i>row</i> exactly,
* or the one that immediately preceeds it.
- *
+ *
* @param regionName region name
* @param row row key
* @param family Column family to look for row in.
@@ -66,11 +66,11 @@ public interface HRegionInterface extend
throws IOException;
/**
- *
+ *
* @return the regions served by this regionserver
*/
public HRegion [] getOnlineRegionsAsArray();
-
+
/**
* Perform Get operation.
* @param regionName name of region to get from
@@ -90,17 +90,17 @@ public interface HRegionInterface extend
public boolean exists(byte [] regionName, Get get) throws IOException;
/**
- * Put data into the specified region
+ * Put data into the specified region
* @param regionName region name
* @param put the data to be put
* @throws IOException e
*/
public void put(final byte [] regionName, final Put put)
throws IOException;
-
+
/**
* Put an array of puts into the specified region
- *
+ *
* @param regionName region name
* @param puts array of puts to execute
* @return The number of processed put's. Returns -1 if all Puts
@@ -111,7 +111,7 @@ public interface HRegionInterface extend
throws IOException;
/**
- * Deletes all the KeyValues that match those found in the Delete object,
+ * Deletes all the KeyValues that match those found in the Delete object,
* if their ts <= to the Delete. In case of a delete with a specific ts it
* only deletes that specific KeyValue.
* @param regionName region name
@@ -123,7 +123,7 @@ public interface HRegionInterface extend
/**
* Put an array of deletes into the specified region
- *
+ *
* @param regionName region name
* @param deletes delete array to execute
* @return The number of processed deletes. Returns -1 if all Deletes
@@ -137,7 +137,7 @@ public interface HRegionInterface extend
* Atomically checks if a row/family/qualifier value match the expectedValue.
* If it does, it adds the put. If passed expected value is null, then the
* check is for non-existance of the row/column.
- *
+ *
* @param regionName region name
* @param row row to check
* @param family column family
@@ -147,16 +147,16 @@ public interface HRegionInterface extend
* @throws IOException e
* @return true if the new put was execute, false otherwise
*/
- public boolean checkAndPut(final byte[] regionName, final byte [] row,
+ public boolean checkAndPut(final byte[] regionName, final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
throws IOException;
-
+
/**
* Atomically increments a column value. If the column value isn't long-like,
* this could throw an exception. If passed expected value is null, then the
* check is for non-existance of the row/column.
- *
+ *
* @param regionName region name
* @param row row to check
* @param family column family
@@ -166,18 +166,18 @@ public interface HRegionInterface extend
* @return new incremented column value
* @throws IOException e
*/
- public long incrementColumnValue(byte [] regionName, byte [] row,
+ public long incrementColumnValue(byte [] regionName, byte [] row,
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException;
-
-
+
+
//
// remote scanner interface
//
/**
* Opens a remote scanner with a RowFilter.
- *
+ *
* @param regionName name of region to scan
* @param scan configured scan object
* @return scannerId scanner identifier used in other calls
@@ -185,7 +185,7 @@ public interface HRegionInterface extend
*/
public long openScanner(final byte [] regionName, final Scan scan)
throws IOException;
-
+
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
@@ -193,7 +193,7 @@ public interface HRegionInterface extend
* @throws IOException e
*/
public Result next(long scannerId) throws IOException;
-
+
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
@@ -204,10 +204,10 @@ public interface HRegionInterface extend
* @throws IOException e
*/
public Result [] next(long scannerId, int numberOfRows) throws IOException;
-
+
/**
* Close a scanner
- *
+ *
* @param scannerId the scanner id returned by openScanner
* @throws IOException e
*/
@@ -233,15 +233,15 @@ public interface HRegionInterface extend
*/
public void unlockRow(final byte [] regionName, final long lockId)
throws IOException;
-
-
+
+
/**
* Method used when a master is taking the place of another failed one.
* @return All regions assigned on this region server
* @throws IOException e
*/
public HRegionInfo[] getRegionsAssignment() throws IOException;
-
+
/**
* Method used when a master is taking the place of another failed one.
* @return The HSI
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java Fri May 7 19:26:45 2010
@@ -35,7 +35,7 @@ public class TableRecordReader
implements RecordReader<ImmutableBytesWritable, Result> {
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
-
+
/**
* Restart from survivable exceptions by creating a new scanner.
*
@@ -114,7 +114,7 @@ implements RecordReader<ImmutableBytesWr
}
public long getPos() {
-
+
// This should be the ordinal tuple in the range;
// not clear how to calculate...
return this.recordReaderImpl.getPos();
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java Fri May 7 19:26:45 2010
@@ -92,7 +92,7 @@ public class TableRecordReaderImpl {
restart(startRow);
}
- byte[] getStartRow() {
+ byte[] getStartRow() {
return this.startRow;
}
/**
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Fri May 7 19:26:45 2010
@@ -28,7 +28,7 @@ import org.apache.hadoop.util.ProgramDri
public class Driver {
/**
* @param args
- * @throws Throwable
+ * @throws Throwable
*/
public static void main(String[] args) throws Throwable {
ProgramDriver pgd = new ProgramDriver();
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Fri May 7 19:26:45 2010
@@ -51,7 +51,7 @@ public class Export {
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+ * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
@@ -68,7 +68,7 @@ public class Export {
/**
* Sets up the actual job.
- *
+ *
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
@@ -115,7 +115,7 @@ public class Export {
/**
* Main entry point.
- *
+ *
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java Fri May 7 19:26:45 2010
@@ -40,31 +40,31 @@ public class GroupingTableMapper
extends TableMapper<ImmutableBytesWritable,Result> implements Configurable {
/**
- * JobConf parameter to specify the columns used to produce the key passed to
+ * JobConf parameter to specify the columns used to produce the key passed to
* collect from the map phase.
*/
public static final String GROUP_COLUMNS =
"hbase.mapred.groupingtablemap.columns";
-
+
/** The grouping columns. */
protected byte [][] columns;
/** The current configuration. */
private Configuration conf = null;
-
+
/**
- * Use this before submitting a TableMap job. It will appropriately set up
+ * Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The table to be processed.
* @param scan The scan with the columns etc.
- * @param groupColumns A space separated list of columns used to form the
+ * @param groupColumns A space separated list of columns used to form the
* key used in collect.
* @param mapper The mapper class.
* @param job The current job.
* @throws IOException When setting up the job fails.
*/
@SuppressWarnings("unchecked")
- public static void initJob(String table, Scan scan, String groupColumns,
+ public static void initJob(String table, Scan scan, String groupColumns,
Class<? extends TableMapper> mapper, Job job) throws IOException {
TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
ImmutableBytesWritable.class, Result.class, job);
@@ -72,18 +72,18 @@ extends TableMapper<ImmutableBytesWritab
}
/**
- * Extract the grouping columns from value to construct a new key. Pass the
- * new key and value to reduce. If any of the grouping columns are not found
+ * Extract the grouping columns from value to construct a new key. Pass the
+ * new key and value to reduce. If any of the grouping columns are not found
* in the value, the record is skipped.
- *
- * @param key The current key.
+ *
+ * @param key The current key.
* @param value The current value.
- * @param context The current context.
+ * @param context The current context.
* @throws IOException When writing the record fails.
* @throws InterruptedException When the job is aborted.
*/
@Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
+ public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
byte[][] keyVals = extractKeyValues(value);
if(keyVals != null) {
@@ -97,7 +97,7 @@ extends TableMapper<ImmutableBytesWritab
* null if any of the columns are not found.
* <p>
* Override this method if you want to deal with nulls differently.
- *
+ *
* @param r The current values.
* @return Array of byte values.
*/
@@ -107,7 +107,7 @@ extends TableMapper<ImmutableBytesWritab
int numCols = columns.length;
if (numCols > 0) {
for (KeyValue value: r.list()) {
- byte [] column = KeyValue.makeColumn(value.getFamily(),
+ byte [] column = KeyValue.makeColumn(value.getFamily(),
value.getQualifier());
for (int i = 0; i < numCols; i++) {
if (Bytes.equals(column, columns[i])) {
@@ -125,9 +125,9 @@ extends TableMapper<ImmutableBytesWritab
/**
* Create a key by concatenating multiple column values.
- * <p>
+ * <p>
* Override this function in order to produce different types of keys.
- *
+ *
* @param vals The current key/values.
* @return A key generated by concatenating multiple column values.
*/
@@ -151,7 +151,7 @@ extends TableMapper<ImmutableBytesWritab
/**
* Returns the current configuration.
- *
+ *
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
@@ -162,7 +162,7 @@ extends TableMapper<ImmutableBytesWritab
/**
* Sets the configuration. This is used to set up the grouping details.
- *
+ *
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
@@ -176,5 +176,5 @@ extends TableMapper<ImmutableBytesWritab
columns[i] = Bytes.toBytes(cols[i]);
}
}
-
+
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri May 7 19:26:45 2010
@@ -50,7 +50,7 @@ import org.mortbay.log.Log;
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- // Get the path of the temporary output file
+ // Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
Configuration conf = context.getConfiguration();
@@ -127,7 +127,7 @@ public class HFileOutputFormat extends F
}
/*
- * Data structure to hold a Writer and amount of data written on it.
+ * Data structure to hold a Writer and amount of data written on it.
*/
static class WriterLength {
long written = 0;
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java Fri May 7 19:26:45 2010
@@ -35,23 +35,23 @@ import org.apache.hadoop.mapreduce.Parti
* This is used to partition the output keys into groups of keys.
* Keys are grouped according to the regions that currently exist
* so that each reducer fills a single region so load is distributed.
- *
+ *
* @param <KEY> The type of the key.
* @param <VALUE> The type of the value.
*/
-public class HRegionPartitioner<KEY, VALUE>
+public class HRegionPartitioner<KEY, VALUE>
extends Partitioner<ImmutableBytesWritable, VALUE>
implements Configurable {
-
+
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
private Configuration conf = null;
private HTable table;
- private byte[][] startKeys;
-
+ private byte[][] startKeys;
+
/**
- * Gets the partition number for a given key (hence record) given the total
+ * Gets the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
- *
+ *
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key The key to be partitioned.
@@ -80,7 +80,7 @@ implements Configurable {
if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
if (i >= numPartitions-1){
// cover if we have less reduces then regions.
- return (Integer.toString(i).hashCode()
+ return (Integer.toString(i).hashCode()
& Integer.MAX_VALUE) % numPartitions;
}
return i;
@@ -92,7 +92,7 @@ implements Configurable {
/**
* Returns the current configuration.
- *
+ *
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
@@ -104,7 +104,7 @@ implements Configurable {
/**
* Sets the configuration. This is used to determine the start keys for the
* given table.
- *
+ *
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
@@ -114,7 +114,7 @@ implements Configurable {
this.conf = configuration;
try {
HBaseConfiguration.addHbaseResources(conf);
- this.table = new HTable(this.conf,
+ this.table = new HTable(this.conf,
configuration.get(TableOutputFormat.OUTPUT_TABLE));
} catch (IOException e) {
LOG.error(e);
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java Fri May 7 19:26:45 2010
@@ -33,9 +33,9 @@ public class IdentityTableMapper
extends TableMapper<ImmutableBytesWritable, Result> {
/**
- * Use this before submitting a TableMap job. It will appropriately set up
+ * Use this before submitting a TableMap job. It will appropriately set up
* the job.
- *
+ *
* @param table The table name.
* @param scan The scan with the columns to scan.
* @param mapper The mapper class.
@@ -51,16 +51,16 @@ extends TableMapper<ImmutableBytesWritab
/**
* Pass the key, value to reduce.
- *
- * @param key The current key.
+ *
+ * @param key The current key.
* @param value The current value.
- * @param context The current context.
+ * @param context The current context.
* @throws IOException When writing the record fails.
* @throws InterruptedException When the job is aborted.
*/
- public void map(ImmutableBytesWritable key, Result value, Context context)
+ public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
-
+
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java Fri May 7 19:26:45 2010
@@ -27,44 +27,44 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.OutputFormat;
/**
- * Convenience class that simply writes all values (which must be
- * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * Convenience class that simply writes all values (which must be
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
* {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
- * passed to it out to the configured HBase table. This works in combination
+ * passed to it out to the configured HBase table. This works in combination
* with {@link TableOutputFormat} which actually does the writing to HBase.<p>
- *
+ *
* Keys are passed along but ignored in TableOutputFormat. However, they can
* be used to control how your values will be divided up amongst the specified
* number of reducers. <p>
- *
- * You can also use the {@link TableMapReduceUtil} class to set up the two
+ *
+ * You can also use the {@link TableMapReduceUtil} class to set up the two
* classes in one step:
* <blockquote><code>
* TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
* </code></blockquote>
* This will also set the proper {@link TableOutputFormat} which is given the
- * <code>table</code> parameter. The
- * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * <code>table</code> parameter. The
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
* {@link org.apache.hadoop.hbase.client.Delete Delete} define the
* row and columns implicitly.
*/
-public class IdentityTableReducer
+public class IdentityTableReducer
extends TableReducer<Writable, Writable, Writable> {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
-
+
/**
- * Writes each given record, consisting of the row key and the given values,
- * to the configured {@link OutputFormat}. It is emitting the row key and each
- * {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
- *
- * @param key The current row key.
- * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
+ * Writes each given record, consisting of the row key and the given values,
+ * to the configured {@link OutputFormat}. It is emitting the row key and each
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
+ *
+ * @param key The current row key.
+ * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
* row.
- * @param context The context of the reduce.
+ * @param context The context of the reduce.
* @throws IOException When writing the record fails.
* @throws InterruptedException When the job gets interrupted.
*/
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Fri May 7 19:26:45 2010
@@ -49,7 +49,7 @@ public class Import {
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+ * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
@@ -63,7 +63,7 @@ public class Import {
}
}
- private static Put resultToPut(ImmutableBytesWritable key, Result result)
+ private static Put resultToPut(ImmutableBytesWritable key, Result result)
throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
@@ -75,13 +75,13 @@ public class Import {
/**
* Sets up the actual job.
- *
+ *
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
- public static Job createSubmittableJob(Configuration conf, String[] args)
+ public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path inputDir = new Path(args[1]);
@@ -109,7 +109,7 @@ public class Import {
/**
* Main entry point.
- *
+ *
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java Fri May 7 19:26:45 2010
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
* {@link Put} or a {@link Delete} instance. All tables must already exist, and
* all Puts and Deletes must reference only valid column families.
* </p>
- *
+ *
* <p>
* Write-ahead logging (HLog) for Puts can be disabled by setting
* {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
@@ -114,7 +114,7 @@ public class MultiTableOutputFormat exte
/**
* Writes an action (Put or Delete) to the specified table.
- *
+ *
* @param tableName
* the table being updated.
* @param action
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Fri May 7 19:26:45 2010
@@ -34,11 +34,11 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.GenericOptionsParser;
/**
- * A job with a just a map phase to count rows. Map outputs table rows IF the
+ * A job with a just a map phase to count rows. Map outputs table rows IF the
* input row has columns that have content.
*/
public class RowCounter {
-
+
/** Name of this 'program'. */
static final String NAME = "rowcounter";
@@ -47,18 +47,18 @@ public class RowCounter {
*/
static class RowCounterMapper
extends TableMapper<ImmutableBytesWritable, Result> {
-
+
/** Counter enumeration to count the actual rows. */
public static enum Counters {ROWS}
/**
* Maps the data.
- *
+ *
* @param row The current table row key.
* @param values The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+ * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
@@ -76,13 +76,13 @@ public class RowCounter {
/**
* Sets up the actual job.
- *
+ *
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
- public static Job createSubmittableJob(Configuration conf, String[] args)
+ public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Job job = new Job(conf, NAME + "_" + tableName);
@@ -107,7 +107,7 @@ public class RowCounter {
scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
}
}
- }
+ }
// Second argument is the table name.
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
@@ -118,7 +118,7 @@ public class RowCounter {
/**
* Main entry point.
- *
+ *
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java Fri May 7 19:26:45 2010
@@ -52,7 +52,7 @@ implements Configurable {
private byte [] endkey;
private byte [][] splits;
private int lastReduces = -1;
-
+
@Override
public int getPartition(final ImmutableBytesWritable key, final VALUE value,
final int reduces) {
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Fri May 7 19:26:45 2010
@@ -34,11 +34,11 @@ import org.apache.hadoop.util.StringUtil
/**
* Convert HBase tabular data into a format that is consumable by Map/Reduce.
*/
-public class TableInputFormat extends TableInputFormatBase
+public class TableInputFormat extends TableInputFormatBase
implements Configurable {
-
+
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
+
/** Job parameter that specifies the input table. */
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
/** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
@@ -61,13 +61,13 @@ implements Configurable {
public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
/** The number of rows for caching that will be passed to scanners. */
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
-
+
/** The configuration. */
private Configuration conf = null;
/**
* Returns the current configuration.
- *
+ *
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
@@ -79,7 +79,7 @@ implements Configurable {
/**
* Sets the configuration. This is used to set the details for the table to
* be scanned.
- *
+ *
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
@@ -93,9 +93,9 @@ implements Configurable {
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
-
+
Scan scan = null;
-
+
if (conf.get(SCAN) != null) {
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
@@ -105,22 +105,22 @@ implements Configurable {
} else {
try {
scan = new Scan();
-
+
if (conf.get(SCAN_COLUMNS) != null) {
scan.addColumns(conf.get(SCAN_COLUMNS));
}
-
- if (conf.get(SCAN_COLUMN_FAMILY) != null) {
+
+ if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
-
+
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
-
+
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
- Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
+ Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
@@ -141,5 +141,5 @@ implements Configurable {
setScan(scan);
}
-
+
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Fri May 7 19:26:45 2010
@@ -40,8 +40,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.util.StringUtils;
/**
- * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
- * {@link Scan} instance that defines the input columns etc. Subclasses may use
+ * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
+ * {@link Scan} instance that defines the input columns etc. Subclasses may use
* other TableRecordReader implementations.
* <p>
* An example of a subclass:
@@ -69,7 +69,7 @@ import org.apache.hadoop.util.StringUtil
*/
public abstract class TableInputFormatBase
extends InputFormat<ImmutableBytesWritable, Result> {
-
+
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
/** Holds the details for the internal scanner. */
@@ -79,17 +79,17 @@ extends InputFormat<ImmutableBytesWritab
/** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null;
-
+
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
- *
+ *
* @param split The split to work with.
* @param context The current context.
* @return The newly created record reader.
* @throws IOException When creating the reader fails.
* @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
- * org.apache.hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.InputSplit,
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
@@ -124,7 +124,7 @@ extends InputFormat<ImmutableBytesWritab
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
- if (keys == null || keys.getFirst() == null ||
+ if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region.");
}
@@ -132,7 +132,7 @@ extends InputFormat<ImmutableBytesWritab
throw new IOException("No table was provided.");
}
int count = 0;
- List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
+ List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue;
@@ -144,19 +144,19 @@ extends InputFormat<ImmutableBytesWritab
// determine if the given start an stop key fall into the region
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 ||
+ (stopRow.length == 0 ||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
- byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+ byte[] splitStart = startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow;
- byte[] splitStop = (stopRow.length == 0 ||
+ byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
+ keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
InputSplit split = new TableSplit(table.getTableName(),
splitStart, splitStop, regionLocation);
splits.add(split);
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled())
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
}
}
@@ -209,7 +209,7 @@ extends InputFormat<ImmutableBytesWritab
/**
* Gets the scan defining the actual details like columns etc.
- *
+ *
* @return The internal scan instance.
*/
public Scan getScan() {
@@ -219,7 +219,7 @@ extends InputFormat<ImmutableBytesWritab
/**
* Sets the scan defining the actual details like columns etc.
- *
+ *
* @param scan The scan to set.
*/
public void setScan(Scan scan) {
@@ -229,7 +229,7 @@ extends InputFormat<ImmutableBytesWritab
/**
* Allows subclasses to set the {@link TableRecordReader}.
*
- * @param tableRecordReader A different {@link TableRecordReader}
+ * @param tableRecordReader A different {@link TableRecordReader}
* implementation.
*/
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Fri May 7 19:26:45 2010
@@ -41,11 +41,11 @@ import org.apache.hadoop.conf.Configurat
*/
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
-
+
/**
- * Use this before submitting a TableMap job. It will appropriately set up
+ * Use this before submitting a TableMap job. It will appropriately set up
* the job.
- *
+ *
* @param table The table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
@@ -55,8 +55,8 @@ public class TableMapReduceUtil {
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
- Class<? extends TableMapper> mapper,
- Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends TableMapper> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job) throws IOException {
job.setInputFormatClass(TableInputFormat.class);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
@@ -69,13 +69,13 @@ public class TableMapReduceUtil {
/**
* Writes the given scan into a Base64 encoded string.
- *
+ *
* @param scan The scan to write out.
* @return The scan saved in a Base64 encoded string.
* @throws IOException When writing the scan fails.
*/
static String convertScanToString(Scan scan) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(out);
scan.write(dos);
return Base64.encodeBytes(out.toByteArray());
@@ -83,7 +83,7 @@ public class TableMapReduceUtil {
/**
* Converts the given Base64 string back into a Scan instance.
- *
+ *
* @param base64 The scan details.
* @return The newly created Scan instance.
* @throws IOException When reading the scan instance fails.
@@ -95,15 +95,15 @@ public class TableMapReduceUtil {
scan.readFields(dis);
return scan;
}
-
+
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
- *
+ *
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
- * @throws IOException When determining the region count fails.
+ * @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job)
@@ -131,16 +131,16 @@ public class TableMapReduceUtil {
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
- *
+ *
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
- * @param partitioner Partitioner to use. Pass <code>null</code> to use
+ * @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
- * @throws IOException When determining the region count fails.
+ * @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job,
@@ -177,17 +177,17 @@ public class TableMapReduceUtil {
job.setPartitionerClass(partitioner);
}
}
-
+
/**
- * Ensures that the given number of reduce tasks for the given job
- * configuration does not exceed the number of regions for the given table.
- *
+ * Ensures that the given number of reduce tasks for the given job
+ * configuration does not exceed the number of regions for the given table.
+ *
* @param table The table to get the region count for.
* @param job The current job to adjust.
* @throws IOException When retrieving the table details fails.
*/
- public static void limitNumReduceTasks(String table, Job job)
- throws IOException {
+ public static void limitNumReduceTasks(String table, Job job)
+ throws IOException {
HTable outputTable = new HTable(job.getConfiguration(), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions)
@@ -195,25 +195,25 @@ public class TableMapReduceUtil {
}
/**
- * Sets the number of reduce tasks for the given job configuration to the
- * number of regions the given table has.
- *
+ * Sets the number of reduce tasks for the given job configuration to the
+ * number of regions the given table has.
+ *
* @param table The table to get the region count for.
* @param job The current job to adjust.
* @throws IOException When retrieving the table details fails.
*/
- public static void setNumReduceTasks(String table, Job job)
- throws IOException {
+ public static void setNumReduceTasks(String table, Job job)
+ throws IOException {
HTable outputTable = new HTable(job.getConfiguration(), table);
int regions = outputTable.getRegionsInfo().size();
job.setNumReduceTasks(regions);
}
-
+
/**
* Sets the number of rows to return and cache with each scanner iteration.
* Higher caching values will enable faster mapreduce jobs at the expense of
* requiring more heap to contain the cached rows.
- *
+ *
* @param job The current job to adjust.
* @param batchSize The number of rows to return in batch with each scanner
* iteration.
@@ -221,5 +221,5 @@ public class TableMapReduceUtil {
public static void setScannerCaching(Job job, int batchSize) {
job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
}
-
+
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java Fri May 7 19:26:45 2010
@@ -24,9 +24,9 @@ import org.apache.hadoop.hbase.io.Immuta
import org.apache.hadoop.mapreduce.Mapper;
/**
- * Extends the base <code>Mapper</code> class to add the required input key
+ * Extends the base <code>Mapper</code> class to add the required input key
* and value classes.
- *
+ *
* @param <KEYOUT> The type of the key.
* @param <VALUEOUT> The type of the value.
* @see org.apache.hadoop.mapreduce.Mapper
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Fri May 7 19:26:45 2010
@@ -38,9 +38,9 @@ import org.apache.hadoop.conf.Configurat
/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
- * while the output value <u>must</u> be either a {@link Put} or a
- * {@link Delete} instance.
- *
+ * while the output value <u>must</u> be either a {@link Put} or a
+ * {@link Delete} instance.
+ *
* @param <KEY> The type of the key. Ignored in this class.
*/
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
@@ -59,18 +59,18 @@ public class TableOutputFormat<KEY> exte
/**
* Writes the reducer output to an HBase table.
- *
+ *
* @param <KEY> The type of the key.
*/
- protected static class TableRecordWriter<KEY>
+ protected static class TableRecordWriter<KEY>
extends RecordWriter<KEY, Writable> {
-
+
/** The table to write to. */
private HTable table;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
- *
+ *
* @param table The table to write to.
*/
public TableRecordWriter(HTable table) {
@@ -79,37 +79,37 @@ public class TableOutputFormat<KEY> exte
/**
* Closes the writer, in this case flush table commits.
- *
+ *
* @param context The context.
* @throws IOException When closing the writer fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public void close(TaskAttemptContext context)
+ public void close(TaskAttemptContext context)
throws IOException {
table.flushCommits();
}
/**
* Writes a key/value pair into the table.
- *
+ *
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
- public void write(KEY key, Writable value)
+ public void write(KEY key, Writable value)
throws IOException {
if (value instanceof Put) this.table.put(new Put((Put)value));
else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
else throw new IOException("Pass a Delete or a Put");
}
}
-
+
/**
* Creates a new record writer.
- *
+ *
* @param context The current task context.
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
@@ -118,7 +118,7 @@ public class TableOutputFormat<KEY> exte
*/
@Override
public RecordWriter<KEY, Writable> getRecordWriter(
- TaskAttemptContext context)
+ TaskAttemptContext context)
throws IOException, InterruptedException {
// expecting exactly one path
Configuration conf = new Configuration(context.getConfiguration());
@@ -150,9 +150,9 @@ public class TableOutputFormat<KEY> exte
/**
* Checks if the output target exists.
- *
+ *
* @param context The current context.
- * @throws IOException When the check fails.
+ * @throws IOException When the check fails.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
*/
@@ -160,12 +160,12 @@ public class TableOutputFormat<KEY> exte
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
// TODO Check if the table exists?
-
+
}
/**
* Returns the output committer.
- *
+ *
* @param context The current context.
* @return The committer.
* @throws IOException When creating the committer fails.
@@ -173,9 +173,9 @@ public class TableOutputFormat<KEY> exte
* @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableOutputCommitter();
}
-
+
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java Fri May 7 19:26:45 2010
@@ -30,14 +30,14 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
- * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
+ * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
* pairs.
*/
public class TableRecordReader
extends RecordReader<ImmutableBytesWritable, Result> {
-
+
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
-
+
/**
* Restart from survivable exceptions by creating a new scanner.
*
@@ -51,7 +51,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Build the scanner. Not done in constructor to allow for extension.
*
- * @throws IOException When restarting the scan fails.
+ * @throws IOException When restarting the scan fails.
*/
public void init() throws IOException {
this.recordReaderImpl.init();
@@ -59,7 +59,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Sets the HBase table.
- *
+ *
* @param htable The {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
@@ -68,7 +68,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Sets the scan defining the actual details like columns etc.
- *
+ *
* @param scan The scan to set.
*/
public void setScan(Scan scan) {
@@ -77,7 +77,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Closes the split.
- *
+ *
* @see org.apache.hadoop.mapreduce.RecordReader#close()
*/
@Override
@@ -87,7 +87,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Returns the current key.
- *
+ *
* @return The current key.
* @throws IOException
* @throws InterruptedException When the job is aborted.
@@ -101,7 +101,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Returns the current value.
- *
+ *
* @return The current value.
* @throws IOException When the value is faulty.
* @throws InterruptedException When the job is aborted.
@@ -114,13 +114,13 @@ extends RecordReader<ImmutableBytesWrita
/**
* Initializes the reader.
- *
+ *
* @param inputsplit The split to work with.
* @param context The current task context.
* @throws IOException When setting up the reader fails.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.RecordReader#initialize(
- * org.apache.hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.InputSplit,
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
@@ -131,7 +131,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* Positions the record reader to the next record.
- *
+ *
* @return <code>true</code> if there was another record.
* @throws IOException When reading the record failed.
* @throws InterruptedException When the job was aborted.
@@ -144,7 +144,7 @@ extends RecordReader<ImmutableBytesWrita
/**
* The current progress of the record reader through its data.
- *
+ *
* @return A number between 0.0 and 1.0, the fraction of the data read.
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
*/