You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC
svn commit: r1446147 [11/35] - in /hbase/branches/hbase-7290v2: ./ bin/
conf/ dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/...
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Feb 14 12:58:12 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
/** Overhead of the structure itself */
private long overhead;
+ /** Where to send victims (blocks evicted from the cache) */
+ private BucketCache victimHandler = null;
+
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
CachedBlock cb = map.get(cacheKey);
if(cb == null) {
if (!repeat) stats.miss(caching);
+ if (victimHandler != null)
+ return victimHandler.getBlock(cacheKey, caching, repeat);
return null;
}
stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
return cb.getBuffer();
}
+ /**
+ * Whether the cache contains block with specified cacheKey
+ * @param cacheKey
+ * @return true if contains the block
+ */
+ public boolean containsBlock(BlockCacheKey cacheKey) {
+ return map.containsKey(cacheKey);
+ }
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
CachedBlock cb = map.get(cacheKey);
if (cb == null) return false;
- evictBlock(cb);
+ evictBlock(cb, false);
return true;
}
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
++numEvicted;
}
}
+ if (victimHandler != null) {
+ numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+ }
return numEvicted;
}
- protected long evictBlock(CachedBlock block) {
+ /**
+ * Evict the block, and it will be cached by the victim handler if exists &&
+ * block may be read again later
+ * @param block
+ * @param evictedByEvictionProcess true if the given block is evicted by
+ * EvictionThread
+ * @return the heap size of evicted block
+ */
+ protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey());
updateSizeMetrics(block, true);
elements.decrementAndGet();
stats.evicted();
+ if (evictedByEvictionProcess && victimHandler != null) {
+ boolean wait = getCurrentSize() < acceptableSize();
+ boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+ victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+ inMemory, wait);
+ }
return block.heapSize();
}
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
CachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
- freedBytes += evictBlock(cb);
+ freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
@@ -532,6 +564,16 @@ public class LruBlockCache implements Bl
if(this.overflow() == that.overflow()) return 0;
return this.overflow() > that.overflow() ? 1 : -1;
}
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null || !(that instanceof BlockBucket)){
+ return false;
+ }
+
+ return compareTo(( BlockBucket)that) == 0;
+ }
+
}
/**
@@ -625,13 +667,13 @@ public class LruBlockCache implements Bl
public void evict() {
synchronized(this) {
- this.notify(); // FindBugs NN_NAKED_NOTIFY
+ this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
}
- void shutdown() {
+ synchronized void shutdown() {
this.go = false;
- interrupt();
+ this.notifyAll();
}
/**
@@ -693,7 +735,7 @@ public class LruBlockCache implements Bl
}
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
- (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+ (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
+ ClassSize.OBJECT);
@@ -762,6 +804,8 @@ public class LruBlockCache implements Bl
}
public void shutdown() {
+ if (victimHandler != null)
+ victimHandler.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -812,4 +856,9 @@ public class LruBlockCache implements Bl
return counts;
}
+ public void setVictimCache(BucketCache handler) {
+ assert victimHandler == null;
+ victimHandler = handler;
+ }
+
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java Thu Feb 14 12:58:12 2013
@@ -70,4 +70,4 @@ public class BlockingRpcCallback<R> impl
}
return result;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java Thu Feb 14 12:58:12 2013
@@ -25,11 +25,8 @@ import java.io.IOException;
* but is only used for logging on the server side, etc.
*/
public class CallerDisconnectedException extends IOException {
+ private static final long serialVersionUID = 1L;
public CallerDisconnectedException(String msg) {
super(msg);
}
-
- private static final long serialVersionUID = 1L;
-
-
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Thu Feb 14 12:58:12 2013
@@ -70,4 +70,4 @@ public interface Delayable {
* @throws IOException
*/
public void endDelayThrowing(Throwable t) throws IOException;
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu Feb 14 12:58:12 2013
@@ -28,6 +28,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -41,6 +42,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -53,6 +55,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@@ -103,9 +106,11 @@ import com.google.protobuf.Message.Build
@InterfaceAudience.Private
public class HBaseClient {
- public static final Log LOG = LogFactory
- .getLog("org.apache.hadoop.ipc.HBaseClient");
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
+ private static final Map<String, Method> methodInstances =
+ new ConcurrentHashMap<String, Method>();
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -121,7 +126,6 @@ public class HBaseClient {
protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
protected String clusterId;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,12 +190,13 @@ public class HBaseClient {
}
public static class FailedServerException extends IOException {
+ private static final long serialVersionUID = -4744376109431464127L;
+
public FailedServerException(String s) {
super(s);
}
}
-
/**
* set the ping interval value in configuration
*
@@ -229,36 +234,11 @@ public class HBaseClient {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
}
- /**
- * Increment this client's reference count
- *
- */
- synchronized void incCount() {
- refCount++;
- }
-
- /**
- * Decrement this client's reference count
- *
- */
- synchronized void decCount() {
- refCount--;
- }
-
- /**
- * Return if this client has no reference
- *
- * @return true if this client has no reference; false otherwise
- */
- synchronized boolean isZeroReference() {
- return refCount==0;
- }
-
/** A call waiting for a value. */
protected class Call {
- final int id; // call id
- final RpcRequestBody param; // rpc request object
- Message value; // value, null if error
+ final int id; // call id
+ final RpcRequestBody param; // rpc request object
+ Message value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
@@ -302,6 +282,7 @@ public class HBaseClient {
return this.startTime;
}
}
+
protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
static {
@@ -335,9 +316,12 @@ public class HBaseClient {
private int reloginMaxBackoff; // max pause before relogin on sasl failure
// currently active calls
- protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
- protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
- protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
+ protected final ConcurrentSkipListMap<Integer, Call> calls =
+ new ConcurrentSkipListMap<Integer, Call>();
+ protected final AtomicLong lastActivity =
+ new AtomicLong(); // last I/O activity time
+ protected final AtomicBoolean shouldCloseConnection =
+ new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
Connection(ConnectionId remoteId) throws IOException {
@@ -414,16 +398,14 @@ public class HBaseClient {
return null;
}
UserInformation.Builder userInfoPB = UserInformation.newBuilder();
- if (ugi != null) {
- if (authMethod == AuthMethod.KERBEROS) {
- // Send effective user for Kerberos auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- } else if (authMethod == AuthMethod.SIMPLE) {
- //Send both effective user and real user for simple auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- if (ugi.getRealUser() != null) {
- userInfoPB.setRealUser(ugi.getRealUser().getUserName());
- }
+ if (authMethod == AuthMethod.KERBEROS) {
+ // Send effective user for Kerberos auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ } else if (authMethod == AuthMethod.SIMPLE) {
+ //Send both effective user and real user for simple auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ if (ugi.getRealUser() != null) {
+ userInfoPB.setRealUser(ugi.getRealUser().getUserName());
}
}
return userInfoPB.build();
@@ -845,11 +827,17 @@ public class HBaseClient {
start();
return;
}
- } catch (IOException e) {
+ } catch (Throwable t) {
failedServers.addToFailedServers(remoteId.address);
- markClosed(e);
+ IOException e = null;
+ if (t instanceof IOException) {
+ e = (IOException)t;
+ markClosed(e);
+ } else {
+ e = new IOException("Coundn't set up IO Streams", t);
+ markClosed(e);
+ }
close();
-
throw e;
}
}
@@ -959,6 +947,24 @@ public class HBaseClient {
}
}
+
+ private Method getMethod(Class<? extends IpcProtocol> protocol,
+ String methodName) {
+ Method method = methodInstances.get(methodName);
+ if (method != null) {
+ return method;
+ }
+ Method[] methods = protocol.getMethods();
+ for (Method m : methods) {
+ if (m.getName().equals(methodName)) {
+ m.setAccessible(true);
+ methodInstances.put(methodName, m);
+ return m;
+ }
+ }
+ return null;
+ }
+
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
@@ -990,9 +996,9 @@ public class HBaseClient {
if (status == Status.SUCCESS) {
Message rpcResponseType;
try {
- rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
- ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
- call.param.getMethodName()));
+ rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+ getMethod(remoteId.getProtocol(),
+ call.param.getMethodName()));
} catch (Exception e) {
throw new RuntimeException(e); //local exception
}
@@ -1270,7 +1276,7 @@ public class HBaseClient {
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Message call(RpcRequestBody param, InetSocketAddress addr,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
Call call = new Call(param);
@@ -1317,7 +1323,6 @@ public class HBaseClient {
* @param exception the relevant exception
* @return an exception to throw
*/
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
protected IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
@@ -1340,25 +1345,9 @@ public class HBaseClient {
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
- * contains nulls for calls that timed out or errored.
- * @param params RpcRequestBody parameters
- * @param addresses socket addresses
- * @return RpcResponseBody[]
- * @throws IOException e
- * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
- */
- @Deprecated
- public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
- throws IOException, InterruptedException {
- return call(params, addresses, null, null);
- }
-
- /** Makes a set of calls in parallel. Each parameter is sent to the
- * corresponding address. When all values are available, or have timed out
- * or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket)
throws IOException, InterruptedException {
if (addresses.length == 0) return new RpcResponseBody[0];
@@ -1393,7 +1382,7 @@ public class HBaseClient {
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
protected Connection getConnection(InetSocketAddress addr,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout,
Call call)
@@ -1436,11 +1425,10 @@ public class HBaseClient {
final InetSocketAddress address;
final User ticket;
final int rpcTimeout;
- Class<? extends VersionedProtocol> protocol;
+ Class<? extends IpcProtocol> protocol;
private static final int PRIME = 16777619;
- ConnectionId(InetSocketAddress address,
- Class<? extends VersionedProtocol> protocol,
+ ConnectionId(InetSocketAddress address, Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout) {
this.protocol = protocol;
@@ -1453,7 +1441,7 @@ public class HBaseClient {
return address;
}
- Class<? extends VersionedProtocol> getProtocol() {
+ Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
@@ -1479,4 +1467,4 @@ public class HBaseClient {
(ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Thu Feb 14 12:58:12 2013
@@ -46,18 +46,17 @@ import java.nio.channels.WritableByteCha
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@@ -68,28 +67,29 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -97,20 +97,20 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-
import org.cliffc.high_scale_lib.Counter;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.impl.NullSpan;
-import org.cloudera.htrace.Trace;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+// Uses Writables doing sasl
/** A client for an IPC service. IPC calls take a single Protobuf message as a
* parameter, and return a single Protobuf message as their value. A service runs on
@@ -169,22 +169,18 @@ public abstract class HBaseServer implem
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
- // For generated protocol classes which doesn't have VERSION field
- private static final Map<Class<?>, Long>
- PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
-
- private static final Map<String, Class<? extends VersionedProtocol>>
- PROTOCOL_CACHE =
- new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
+ private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
+ new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
- static Class<? extends VersionedProtocol> getProtocolClass(
+ @SuppressWarnings("unchecked")
+ static Class<? extends IpcProtocol> getProtocolClass(
String protocolName, Configuration conf)
throws ClassNotFoundException {
- Class<? extends VersionedProtocol> protocol =
+ Class<? extends IpcProtocol> protocol =
PROTOCOL_CACHE.get(protocolName);
if (protocol == null) {
- protocol = (Class<? extends VersionedProtocol>)
+ protocol = (Class<? extends IpcProtocol>)
conf.getClassByName(protocolName);
PROTOCOL_CACHE.put(protocolName, protocol);
}
@@ -192,7 +188,7 @@ public abstract class HBaseServer implem
}
/** Returns the server instance called under or null. May be called under
- * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+ * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
* and under protobuf methods of parameters and return values.
* Permits applications to access the server context.
* @return HBaseServer
@@ -263,8 +259,6 @@ public abstract class HBaseServer implem
protected int highPriorityLevel; // what level a high priority call is at
- private volatile int responseQueueLen; // size of response queue for this server
-
protected final List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
//maintain a list
@@ -278,7 +272,7 @@ public abstract class HBaseServer implem
protected BlockingQueue<Call> replicationQueue;
private int numOfReplicationHandlers = 0;
private Handler[] replicationHandlers = null;
-
+
protected HBaseRPCErrorHandler errorHandler = null;
/**
@@ -358,7 +352,7 @@ public abstract class HBaseServer implem
if (errorClass != null) {
this.isError = true;
}
-
+
ByteBufferOutputStream buf = null;
if (value != null) {
buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
@@ -460,7 +454,7 @@ public abstract class HBaseServer implem
public synchronized boolean isReturnValueDelayed() {
return this.delayReturnValue;
}
-
+
@Override
public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
if (!connection.channel.isOpen()) {
@@ -1000,7 +994,6 @@ public abstract class HBaseServer implem
return true;
}
if (!call.response.hasRemaining()) {
- responseQueueLen--;
call.connection.decRpcCount();
//noinspection RedundantIfStatement
if (numElements == 1) { // last call fully processes.
@@ -1070,7 +1063,6 @@ public abstract class HBaseServer implem
void doRespond(Call call) throws IOException {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
- responseQueueLen++;
boolean doRegister = false;
synchronized (call.connection.responseQueue) {
@@ -1120,7 +1112,7 @@ public abstract class HBaseServer implem
protected String hostAddress;
protected int remotePort;
ConnectionHeader header;
- Class<? extends VersionedProtocol> protocol;
+ Class<? extends IpcProtocol> protocol;
protected UserGroupInformation user = null;
private AuthMethod authMethod;
private boolean saslContextEstablished;
@@ -1324,7 +1316,7 @@ public abstract class HBaseServer implem
LOG.debug("SASL server context established. Authenticated client: "
+ user + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
- }
+ }
metrics.authenticationSuccess();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
@@ -1437,7 +1429,7 @@ public abstract class HBaseServer implem
}
}
if (dataLength < 0) {
- throw new IllegalArgumentException("Unexpected data length "
+ throw new IllegalArgumentException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);
@@ -1758,7 +1750,7 @@ public abstract class HBaseServer implem
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
- status.setConnection(call.connection.getHostAddress(),
+ status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
if (LOG.isDebugEnabled())
@@ -2019,11 +2011,12 @@ public abstract class HBaseServer implem
}
return handlers;
}
-
+
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
+ @SuppressWarnings("unchecked")
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
@@ -2051,7 +2044,7 @@ public abstract class HBaseServer implem
}
}
}
-
+
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
@@ -2110,7 +2103,7 @@ public abstract class HBaseServer implem
connection.getProtocol());
}
authManager.authorize(user != null ? user : null,
- protocol, getConf(), addr);
+ protocol, getConf(), addr);
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java Thu Feb 14 12:58:12 2013
@@ -18,26 +18,20 @@
package org.apache.hadoop.hbase.ipc;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import java.io.IOException;
-
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Thu Feb 14 12:58:12 2013
@@ -19,14 +19,11 @@
package org.apache.hadoop.hbase.ipc;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceAudience.Private
public class MetricsHBaseServer {
- private static Log LOG = LogFactory.getLog(MetricsHBaseServer.class);
private MetricsHBaseServerSource source;
public MetricsHBaseServer(String serverName, MetricsHBaseServerWrapper wrapper) {
@@ -69,4 +66,4 @@ public class MetricsHBaseServer {
public MetricsHBaseServerSource getMetricsSource() {
return source;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java Thu Feb 14 12:58:12 2013
@@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
import java.net.InetAddress;
@@ -90,7 +91,7 @@ public class RequestContext {
*/
public static void set(User user,
InetAddress remoteAddress,
- Class<? extends VersionedProtocol> protocol) {
+ Class<? extends IpcProtocol> protocol) {
RequestContext ctx = instance.get();
ctx.user = user;
ctx.remoteAddress = remoteAddress;
@@ -111,12 +112,12 @@ public class RequestContext {
private User user;
private InetAddress remoteAddress;
- private Class<? extends VersionedProtocol> protocol;
+ private Class<? extends IpcProtocol> protocol;
// indicates we're within a RPC request invocation
private boolean inRequest;
private RequestContext(User user, InetAddress remoteAddr,
- Class<? extends VersionedProtocol> protocol) {
+ Class<? extends IpcProtocol> protocol) {
this.user = user;
this.remoteAddress = remoteAddr;
this.protocol = protocol;
@@ -130,11 +131,11 @@ public class RequestContext {
return remoteAddress;
}
- public Class<? extends VersionedProtocol> getProtocol() {
+ public Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
public boolean isInRequest() {
return inRequest;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Thu Feb 14 12:58:12 2013
@@ -23,16 +23,18 @@ import com.google.common.base.Function;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import java.io.IOException;
import java.net.InetSocketAddress;
-/**
- */
@InterfaceAudience.Private
public interface RpcServer {
+ // TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'?
+ // Also, the call takes a RpcRequestBody, an already composed combination of
+ // rpc Request and metadata. Should disentangle metadata and rpc Request Message.
void setSocketSendBufSize(int size);
@@ -45,12 +47,12 @@ public interface RpcServer {
InetSocketAddress getListenerAddress();
/** Called for each call.
- * @param param writable parameter
+ * @param param parameter
* @param receiveTime time
- * @return Message
+ * @return Message Protobuf response Message
* @throws java.io.IOException e
*/
- Message call(Class<? extends VersionedProtocol> protocol,
+ Message call(Class<? extends IpcProtocol> protocol,
RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
@@ -62,9 +64,8 @@ public interface RpcServer {
void startThreads();
-
/**
* Returns the metrics instance for reporting RPC call statistics
*/
MetricsHBaseServer getMetrics();
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java Thu Feb 14 12:58:12 2013
@@ -121,4 +121,13 @@ public class ServerRpcController impleme
public boolean failedOnException() {
return serviceException != null;
}
+
+ /**
+ * Throws an IOException back out if one is currently stored.
+ */
+ public void checkFailed() throws IOException {
+ if (failedOnException()) {
+ throw getFailedOn();
+ }
+ }
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java Thu Feb 14 12:58:12 2013
@@ -106,7 +106,20 @@ public class TableSplit implements Input
Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
}
+ @Override
public int compareTo(TableSplit o) {
return Bytes.compareTo(getStartRow(), o.getStartRow());
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof TableSplit)) {
+ return false;
+ }
+ TableSplit other = (TableSplit)o;
+ return Bytes.equals(m_tableName, other.m_tableName) &&
+ Bytes.equals(m_startRow, other.m_startRow) &&
+ Bytes.equals(m_endRow, other.m_endRow) &&
+ m_regionLocation.equals(other.m_regionLocation);
+ }
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Thu Feb 14 12:58:12 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* Export an HBase table.
@@ -137,10 +138,10 @@ public class Export {
int batching = conf.getInt(EXPORT_BATCHING, -1);
if (batching != -1){
- try{
+ try {
s.setBatch(batching);
- } catch (RuntimeException e) {
- LOG.error("Batching could not be set", e);
+ } catch (IncompatibleFilterException e) {
+ LOG.error("Batching could not be set", e);
}
}
LOG.info("versions=" + versions + ", starttime=" + startTime +
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Feb 14 12:58:12 2013
@@ -49,17 +49,18 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -82,7 +83,7 @@ import org.apache.hadoop.mapreduce.lib.p
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
- TimeRangeTracker trt = new TimeRangeTracker();
+ private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
private static final String DATABLOCK_ENCODING_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
@@ -106,6 +107,7 @@ public class HFileOutputFormat extends F
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+ final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
final HFileDataBlockEncoder encoder;
@@ -166,7 +168,6 @@ public class HFileOutputFormat extends F
// we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
- trt.includeTimestamp(kv);
wl.writer.append(kv);
wl.written += length;
@@ -187,9 +188,9 @@ public class HFileOutputFormat extends F
this.rollRequested = false;
}
- /* Create a new HFile.Writer.
+ /* Create a new StoreFile.Writer.
* @param family
- * @return A WriterLength, containing a new HFile.Writer.
+ * @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +199,28 @@ public class HFileOutputFormat extends F
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
- wl.writer = HFile.getWriterFactoryNoCache(conf)
- .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
- .withBlockSize(blocksize)
- .withCompression(compression)
- .withComparator(KeyValue.KEY_COMPARATOR)
+ String bloomTypeStr = bloomTypeMap.get(family);
+ BloomType bloomType = BloomType.NONE;
+ if (bloomTypeStr != null) {
+ bloomType = BloomType.valueOf(bloomTypeStr);
+ }
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+ .withOutputDir(familydir)
+ .withCompression(AbstractHFileWriter.compressionByName(compression))
+ .withBloomType(bloomType)
+ .withComparator(KeyValue.COMPARATOR)
.withDataBlockEncoder(encoder)
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
- .create();
+ .build();
+
this.writers.put(family, wl);
return wl;
}
- private void close(final HFile.Writer w) throws IOException {
+ private void close(final StoreFile.Writer w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +230,7 @@ public class HFileOutputFormat extends F
Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
- w.appendFileInfo(StoreFile.TIMERANGE_KEY,
- WritableUtils.toByteArray(trt));
+ w.appendTrackedTimestampsToMetadata();
w.close();
}
}
@@ -241,7 +249,7 @@ public class HFileOutputFormat extends F
*/
static class WriterLength {
long written = 0;
- HFile.Writer writer = null;
+ StoreFile.Writer writer = null;
}
/**
@@ -359,7 +367,8 @@ public class HFileOutputFormat extends F
// Set compression algorithms based on column families
configureCompression(table, conf);
-
+ configureBloomType(table, conf);
+
TableMapReduceUtil.addDependencyJars(job);
LOG.info("Incremental table output configured.");
}
@@ -375,25 +384,39 @@ public class HFileOutputFormat extends F
* algorithm
*/
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
- Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
- String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
- for (String familyConf : compressionConf.split("&")) {
+ return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+ }
+
+ private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+ return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+ }
+
+ /**
+ * Run inside the task to deserialize column family to given conf value map.
+ *
+ * @param conf
+ * @param confName
+ * @return a map of column family to the given configuration value
+ */
+ private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+ Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+ String confVal = conf.get(confName, "");
+ for (String familyConf : confVal.split("&")) {
String[] familySplit = familyConf.split("=");
if (familySplit.length != 2) {
continue;
}
-
try {
- compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
throw new AssertionError(e);
}
}
- return compressionMap;
+ return confValMap;
}
-
+
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
@@ -403,6 +426,8 @@ public class HFileOutputFormat extends F
* @throws IOException
* on failure to read column family descriptors
*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
static void configureCompression(HTable table, Configuration conf) throws IOException {
StringBuilder compressionConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
@@ -423,4 +448,35 @@ public class HFileOutputFormat extends F
// Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
}
+
+ /**
+ * Serialize column family to bloom type map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ static void configureBloomType(HTable table, Configuration conf) throws IOException {
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ StringBuilder bloomTypeConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ bloomTypeConfigValue.append('&');
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ bloomTypeConfigValue.append('=');
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+ }
+ conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+ }
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java Thu Feb 14 12:58:12 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,10 +48,10 @@ import org.apache.hadoop.mapreduce.TaskA
*/
@InterfaceAudience.Public
public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
- private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
+ private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
- public static String START_TIME_KEY = "hlog.start.time";
- public static String END_TIME_KEY = "hlog.end.time";
+ public static final String START_TIME_KEY = "hlog.start.time";
+ public static final String END_TIME_KEY = "hlog.end.time";
/**
* {@link InputSplit} for {@link HLog} files. Each split represent
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Thu Feb 14 12:58:12 2013
@@ -19,27 +19,41 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
/**
* Import data written by {@link Export}.
@@ -47,9 +61,15 @@ import org.apache.hadoop.util.GenericOpt
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Import {
+ private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+ final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+ // Optional filter to use for mappers
+ private static Filter filter;
/**
* A mapper that just writes out KeyValues.
@@ -72,6 +92,10 @@ public class Import {
throws IOException {
try {
for (KeyValue kv : value.raw()) {
+ kv = filterKv(kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+
context.write(row, convertKv(kv, cfRenameMap));
}
} catch (InterruptedException e) {
@@ -82,6 +106,7 @@ public class Import {
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
+ filter = instantiateFilter(context.getConfiguration());
}
}
@@ -91,6 +116,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
+ private UUID clusterId;
/**
* @param row The current table row key.
@@ -116,6 +142,10 @@ public class Import {
Put put = null;
Delete delete = null;
for (KeyValue kv : result.raw()) {
+ kv = filterKv(kv);
+ // skip if we filter it out
+ if (kv == null) continue;
+
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (kv.isDelete()) {
@@ -131,17 +161,106 @@ public class Import {
}
}
if (put != null) {
+ put.setClusterId(clusterId);
context.write(key, put);
}
if (delete != null) {
+ delete.setClusterId(clusterId);
context.write(key, delete);
}
}
@Override
public void setup(Context context) {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ cfRenameMap = createCfRenameMap(conf);
+ filter = instantiateFilter(conf);
+
+ try {
+ HConnection connection = HConnectionManager.getConnection(conf);
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+ clusterId = zkHelper.getUUIDForCluster(zkw);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.error("Problem connecting to ZooKeper during task setup", e);
+ } catch (KeeperException e) {
+ LOG.error("Problem reading ZooKeeper data during task setup", e);
+ } catch (IOException e) {
+ LOG.error("Problem setting up task", e);
+ }
+
+ }
+ }
+
+ /**
+ * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+ * optionally not include in the job output
+ * @param conf {@link Configuration} from which to load the filter
+ * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+ * @throws IllegalArgumentException if the filter is misconfigured
+ */
+ private static Filter instantiateFilter(Configuration conf) {
+ // get the filter, if it was configured
+ Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filterClass == null) {
+ LOG.debug("No configured filter class, accepting all keyvalues.");
+ return null;
+ }
+ LOG.debug("Attempting to create filter:" + filterClass);
+
+ try {
+ Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+ return (Filter) m.invoke(null, getFilterArgs(conf));
+ } catch (IllegalAccessException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+ ArrayList<byte[]> args = new ArrayList<byte[]>();
+ String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+ for (String arg : sargs) {
+ // all the filters' instantiation methods expected quoted args since they are coming from
+ // the shell, so add them here, though its shouldn't really be needed :-/
+ args.add(Bytes.toBytes("'" + arg + "'"));
+ }
+ return args;
+ }
+
+ /**
+ * Attempt to filter out the keyvalue
+ * @param kv {@link KeyValue} on which to apply the filter
+ * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+ * {@link KeyValue}
+ */
+ private static KeyValue filterKv(KeyValue kv) {
+ // apply the filter and skip this kv if the filter doesn't apply
+ if (filter != null) {
+ Filter.ReturnCode code = filter.filterKeyValue(kv);
+ System.out.println("Filter returned:" + code);
+ // if its not an accept type, then skip this kv
+ if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+ .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+ if (LOG.isDebugEnabled()) {
+ System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+ }
+ return null;
+ }
}
+ return kv;
}
// helper: create a new KeyValue based on CF rename map
@@ -223,13 +342,33 @@ public class Import {
}
conf.set(CF_RENAME_PROP, sb.toString());
}
-
+
+ /**
+ * Add a Filter to be instantiated on import
+ * @param conf Configuration to update (will be passed to the job)
+ * @param clazz {@link Filter} subclass to instantiate on the server.
+ * @param args List of arguments to pass to the filter on instantiation
+ */
+ public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+ List<String> args) {
+ conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+ // build the param string for the key
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < args.size(); i++) {
+ String arg = args.get(i);
+ builder.append(arg);
+ if (i != args.size() - 1) {
+ builder.append(",");
+ }
+ }
+ conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+ }
/**
* Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
+ * @param conf The current configuration.
+ * @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
@@ -242,6 +381,17 @@ public class Import {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+ // make sure we get the filter in the jars
+ try {
+ Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filter != null) {
+ TableMapReduceUtil.addDependencyJars(conf, filter);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName);
@@ -274,6 +424,15 @@ public class Import {
System.err.println("By default Import will load data directly into HBase. To instead generate");
System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err
+ .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+ System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+ System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+ System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ + CF_RENAME_PROP + " property. Futher, filters will only use the"
+ + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+ + "the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
+ " -Dmapred.reduce.tasks.speculative.execution=false");
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Feb 14 12:58:12 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.HB
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -71,11 +72,13 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -91,19 +94,30 @@ import com.google.common.util.concurrent
@InterfaceAudience.Public
@InterfaceStability.Stable
public class LoadIncrementalHFiles extends Configured implements Tool {
- private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
- static AtomicLong regionCount = new AtomicLong(0);
+ private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+ static final AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
private Configuration cfg;
- public static String NAME = "completebulkload";
- private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+ public static final String NAME = "completebulkload";
+ private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
private boolean assignSeqIds;
- public LoadIncrementalHFiles(Configuration conf) throws Exception {
+ private boolean useSecure;
+ private Token<?> userToken;
+ private String bulkToken;
+
+ //package private for testing
+ LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
super(conf);
this.cfg = conf;
this.hbAdmin = new HBaseAdmin(conf);
+ //added simple for testing
+ this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
+ }
+
+ public LoadIncrementalHFiles(Configuration conf) throws Exception {
+ this(conf, null);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
}
@@ -215,6 +229,18 @@ public class LoadIncrementalHFiles exten
return;
}
+ //If using secure bulk load
+ //prepare staging directory and token
+ if(useSecure) {
+ FileSystem fs = FileSystem.get(cfg);
+ //This condition is here for unit testing
+ //Since delegation token doesn't work in mini cluster
+ if(User.isSecurityEnabled()) {
+ userToken = fs.getDelegationToken("renewer");
+ }
+ bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
+ }
+
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
// need to reload split keys each iteration.
@@ -243,6 +269,18 @@ public class LoadIncrementalHFiles exten
}
} finally {
+ if(useSecure) {
+ if(userToken != null) {
+ try {
+ userToken.cancel(cfg);
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel HDFS delegation token.", e);
+ }
+ }
+ if(bulkToken != null) {
+ new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+ }
+ }
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
@@ -476,11 +514,47 @@ public class LoadIncrementalHFiles exten
tableName, first) {
@Override
public Boolean call() throws Exception {
- LOG.debug("Going to connect to server " + location + " for row "
- + Bytes.toStringBinary(row));
- byte[] regionName = location.getRegionInfo().getRegionName();
- return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
- assignSeqIds);
+ SecureBulkLoadClient secureClient = null;
+ boolean success = false;
+
+ try {
+ LOG.debug("Going to connect to server " + location + " for row "
+ + Bytes.toStringBinary(row));
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ if(!useSecure) {
+ success = ProtobufUtil.bulkLoadHFile(server, famPaths, regionName, assignSeqIds);
+ } else {
+ HTable table = new HTable(conn.getConfiguration(), tableName);
+ secureClient = new SecureBulkLoadClient(table);
+ success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
+ }
+ return success;
+ } finally {
+ //Best effort copying of files that might not have been imported
+ //from the staging directory back to original location
+ //in user directory
+ if(secureClient != null && !success) {
+ FileSystem fs = FileSystem.get(cfg);
+ for(Pair<byte[], String> el : famPaths) {
+ Path hfileStagingPath = null;
+ Path hfileOrigPath = new Path(el.getSecond());
+ try {
+ hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+ hfileOrigPath.getName());
+ if(fs.rename(hfileStagingPath, hfileOrigPath)) {
+ LOG.debug("Moved back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ } else if(fs.exists(hfileStagingPath)){
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ }
+ } catch(Exception ex) {
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath, ex);
+ }
+ }
+ }
+ }
}
};
@@ -626,11 +700,11 @@ public class LoadIncrementalHFiles exten
}
HTableDescriptor htd = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = null;
+ HColumnDescriptor hcd;
// Add column families
// Build a set of keys
- byte[][] keys = null;
+ byte[][] keys;
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (FileStatus stat : familyDirStatuses) {
@@ -667,10 +741,10 @@ public class LoadIncrementalHFiles exten
" last=" + Bytes.toStringBinary(last));
// To eventually infer start key-end key boundaries
- Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+ Integer value = map.containsKey(first)? map.get(first):0;
map.put(first, value+1);
- value = map.containsKey(last)?(Integer)map.get(last):0;
+ value = map.containsKey(last)? map.get(last):0;
map.put(last, value-1);
} finally {
reader.close();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Thu Feb 14 12:58:12 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.Immuta
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -219,7 +220,8 @@ extends InputFormat<ImmutableBytesWritab
private String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
- hostName = DNS.reverseDns(ipAddress, this.nameServer);
+ hostName = Strings.domainNamePointerToHostName(
+ DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Thu Feb 14 12:58:12 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
Delete del = null;
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
- // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+ // filtering HLog meta entries
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Feb 14 12:58:12 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
+import com.google.common.collect.LinkedHashMultimap;
+
/**
* Manages and performs region assignment.
* <p>
@@ -162,8 +162,10 @@ public class AssignmentManager extends Z
* that ServerShutdownHandler can be fully enabled and re-assign regions
* of dead servers. So that when re-assignment happens, AssignmentManager
* has proper region states.
+ *
+ * Protected to ease testing.
*/
- final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+ protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
/**
* Constructs a new assignment manager.
@@ -610,7 +612,7 @@ public class AssignmentManager extends Z
*/
private void handleRegion(final RegionTransition rt, int expectedVersion) {
if (rt == null) {
- LOG.warn("Unexpected NULL input " + rt);
+ LOG.warn("Unexpected NULL input for RegionTransition rt");
return;
}
final ServerName sn = rt.getServerName();
@@ -1059,13 +1061,27 @@ public class AssignmentManager extends Z
ZKUtil.listChildrenAndWatchForNewChildren(
watcher, watcher.assignmentZNode);
if (children != null) {
+ Stat stat = new Stat();
for (String child : children) {
// if region is in transition, we already have a watch
// on it, so no need to watch it again. So, as I know for now,
// this is needed to watch splitting nodes only.
if (!regionStates.isRegionInTransition(child)) {
- ZKUtil.watchAndCheckExists(watcher,
- ZKUtil.joinZNode(watcher.assignmentZNode, child));
+ stat.setVersion(0);
+ byte[] data = ZKAssign.getDataAndWatch(watcher,
+ ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
+ if (data != null && stat.getVersion() > 0) {
+ try {
+ RegionTransition rt = RegionTransition.parseFrom(data);
+
+ //See HBASE-7551, handle splitting too, in case we miss the node change event
+ if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
+ handleRegion(rt, stat.getVersion());
+ }
+ } catch (DeserializationException de) {
+ LOG.error("error getting data for " + child, de);
+ }
+ }
}
}
}
@@ -1461,6 +1477,7 @@ public class AssignmentManager extends Z
return;
}
// This never happens. Currently regionserver close always return true.
+ // Todo; this can now happen (0.96) if there is an exception in a coprocessor
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
} catch (Throwable t) {
@@ -2633,11 +2650,11 @@ public class AssignmentManager extends Z
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
- boolean isCarryingRoot(ServerName serverName) {
+ public boolean isCarryingRoot(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
}
- boolean isCarryingMeta(ServerName serverName) {
+ public boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Thu Feb 14 12:58:12 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ServerNam
/**
* Run bulk assign. Does one RCP per regionserver passing a
- * batch of regions using {@link SingleServerBulkAssigner}.
+ * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
*/
@InterfaceAudience.Private
public class GeneralBulkAssigner extends BulkAssigner {