You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC

svn commit: r1446147 [11/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Feb 14 12:58:12 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
   /** Overhead of the structure itself */
   private long overhead;
 
+  /** Where to send victims (blocks evicted from the cache) */
+  private BucketCache victimHandler = null;
+
   /**
    * Default constructor.  Specify maximum size and expected average block
    * size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
     CachedBlock cb = map.get(cacheKey);
     if(cb == null) {
       if (!repeat) stats.miss(caching);
+      if (victimHandler != null)
+        return victimHandler.getBlock(cacheKey, caching, repeat);
       return null;
     }
     stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
     return cb.getBuffer();
   }
 
+  /**
+   * Whether the cache contains block with specified cacheKey
+   * @param cacheKey
+   * @return true if contains the block
+   */
+  public boolean containsBlock(BlockCacheKey cacheKey) {
+    return map.containsKey(cacheKey);
+  }
 
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
     CachedBlock cb = map.get(cacheKey);
     if (cb == null) return false;
-    evictBlock(cb);
+    evictBlock(cb, false);
     return true;
   }
 
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
           ++numEvicted;
       }
     }
+    if (victimHandler != null) {
+      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+    }
     return numEvicted;
   }
 
-  protected long evictBlock(CachedBlock block) {
+  /**
+   * Evict the block, and it will be cached by the victim handler if exists &&
+   * block may be read again later
+   * @param block
+   * @param evictedByEvictionProcess true if the given block is evicted by
+   *          EvictionThread
+   * @return the heap size of evicted block
+   */
+  protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
     map.remove(block.getCacheKey());
     updateSizeMetrics(block, true);
     elements.decrementAndGet();
     stats.evicted();
+    if (evictedByEvictionProcess && victimHandler != null) {
+      boolean wait = getCurrentSize() < acceptableSize();
+      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+          inMemory, wait);
+    }
     return block.heapSize();
   }
 
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
       CachedBlock cb;
       long freedBytes = 0;
       while ((cb = queue.pollLast()) != null) {
-        freedBytes += evictBlock(cb);
+        freedBytes += evictBlock(cb, true);
         if (freedBytes >= toFree) {
           return freedBytes;
         }
@@ -532,6 +564,16 @@ public class LruBlockCache implements Bl
       if(this.overflow() == that.overflow()) return 0;
       return this.overflow() > that.overflow() ? 1 : -1;
     }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null || !(that instanceof BlockBucket)){
+        return false;
+      }
+
+      return compareTo(( BlockBucket)that) == 0;
+    }
+
   }
 
   /**
@@ -625,13 +667,13 @@ public class LruBlockCache implements Bl
 
     public void evict() {
       synchronized(this) {
-        this.notify(); // FindBugs NN_NAKED_NOTIFY
+        this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
       }
     }
 
-    void shutdown() {
+    synchronized void shutdown() {
       this.go = false;
-      interrupt();
+      this.notifyAll();
     }
 
     /**
@@ -693,7 +735,7 @@ public class LruBlockCache implements Bl
   }
 
   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
-      (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+      (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
       + ClassSize.OBJECT);
 
@@ -762,6 +804,8 @@ public class LruBlockCache implements Bl
   }
 
   public void shutdown() {
+    if (victimHandler != null)
+      victimHandler.shutdown();
     this.scheduleThreadPool.shutdown();
     for (int i = 0; i < 10; i++) {
       if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -812,4 +856,9 @@ public class LruBlockCache implements Bl
     return counts;
   }
 
+  public void setVictimCache(BucketCache handler) {
+    assert victimHandler == null;
+    victimHandler = handler;
+  }
+
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java Thu Feb 14 12:58:12 2013
@@ -70,4 +70,4 @@ public class BlockingRpcCallback<R> impl
     }
     return result;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java Thu Feb 14 12:58:12 2013
@@ -25,11 +25,8 @@ import java.io.IOException;
  * but is only used for logging on the server side, etc.
  */
 public class CallerDisconnectedException extends IOException {
+  private static final long serialVersionUID = 1L;
   public CallerDisconnectedException(String msg) {
     super(msg);
   }
-
-  private static final long serialVersionUID = 1L;
-
-  
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Thu Feb 14 12:58:12 2013
@@ -70,4 +70,4 @@ public interface Delayable {
    * @throws IOException
    */
   public void endDelayThrowing(Throwable t) throws IOException;
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu Feb 14 12:58:12 2013
@@ -28,6 +28,7 @@ import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -41,6 +42,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,6 +55,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@@ -103,9 +106,11 @@ import com.google.protobuf.Message.Build
 @InterfaceAudience.Private
 public class HBaseClient {
 
-  public static final Log LOG = LogFactory
-      .getLog("org.apache.hadoop.ipc.HBaseClient");
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
   protected final PoolMap<ConnectionId, Connection> connections;
+  private static final Map<String, Method> methodInstances =
+      new ConcurrentHashMap<String, Method>();
 
   protected int counter;                            // counter for call ids
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -121,7 +126,6 @@ public class HBaseClient {
   protected FailedServers failedServers;
 
   protected final SocketFactory socketFactory;           // how to create sockets
-  private int refCount = 1;
   protected String clusterId;
 
   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,12 +190,13 @@ public class HBaseClient {
   }
 
   public static class FailedServerException extends IOException {
+    private static final long serialVersionUID = -4744376109431464127L;
+
     public FailedServerException(String s) {
       super(s);
     }
   }
 
-
   /**
    * set the ping interval value in configuration
    *
@@ -229,36 +234,11 @@ public class HBaseClient {
     return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
   }
 
-  /**
-   * Increment this client's reference count
-   *
-   */
-  synchronized void incCount() {
-    refCount++;
-  }
-
-  /**
-   * Decrement this client's reference count
-   *
-   */
-  synchronized void decCount() {
-    refCount--;
-  }
-
-  /**
-   * Return if this client has no reference
-   *
-   * @return true if this client has no reference; false otherwise
-   */
-  synchronized boolean isZeroReference() {
-    return refCount==0;
-  }
-
   /** A call waiting for a value. */
   protected class Call {
-    final int id;                                       // call id
-    final RpcRequestBody param;                         // rpc request object
-    Message value;                               // value, null if error
+    final int id;                                 // call id
+    final RpcRequestBody param;                   // rpc request object
+    Message value;                                // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
     long startTime;
@@ -302,6 +282,7 @@ public class HBaseClient {
       return this.startTime;
     }
   }
+
   protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
       new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
   static {
@@ -335,9 +316,12 @@ public class HBaseClient {
     private int reloginMaxBackoff; // max pause before relogin on sasl failure
 
     // currently active calls
-    protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
-    protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
-    protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
+    protected final ConcurrentSkipListMap<Integer, Call> calls =
+      new ConcurrentSkipListMap<Integer, Call>();
+    protected final AtomicLong lastActivity =
+      new AtomicLong(); // last I/O activity time
+    protected final AtomicBoolean shouldCloseConnection =
+      new AtomicBoolean();  // indicate if the connection is closed
     protected IOException closeException; // close reason
 
     Connection(ConnectionId remoteId) throws IOException {
@@ -414,16 +398,14 @@ public class HBaseClient {
         return null;
       }
       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
-      if (ugi != null) {
-        if (authMethod == AuthMethod.KERBEROS) {
-          // Send effective user for Kerberos auth
-          userInfoPB.setEffectiveUser(ugi.getUserName());
-        } else if (authMethod == AuthMethod.SIMPLE) {
-          //Send both effective user and real user for simple auth
-          userInfoPB.setEffectiveUser(ugi.getUserName());
-          if (ugi.getRealUser() != null) {
-            userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-          }
+      if (authMethod == AuthMethod.KERBEROS) {
+        // Send effective user for Kerberos auth
+        userInfoPB.setEffectiveUser(ugi.getUserName());
+      } else if (authMethod == AuthMethod.SIMPLE) {
+        //Send both effective user and real user for simple auth
+        userInfoPB.setEffectiveUser(ugi.getUserName());
+        if (ugi.getRealUser() != null) {
+          userInfoPB.setRealUser(ugi.getRealUser().getUserName());
         }
       }
       return userInfoPB.build();
@@ -845,11 +827,17 @@ public class HBaseClient {
           start();
           return;
         }
-      } catch (IOException e) {
+      } catch (Throwable t) {
         failedServers.addToFailedServers(remoteId.address);
-        markClosed(e);
+        IOException e = null;
+        if (t instanceof IOException) {
+          e = (IOException)t;
+          markClosed(e);
+        } else {
+          e = new IOException("Coundn't set up IO Streams", t);
+          markClosed(e);
+        }
         close();
-
         throw e;
       }
     }
@@ -959,6 +947,24 @@ public class HBaseClient {
       }
     }
 
+
+    private Method getMethod(Class<? extends IpcProtocol> protocol,
+        String methodName) {
+      Method method = methodInstances.get(methodName);
+      if (method != null) {
+        return method;
+      }
+      Method[] methods = protocol.getMethods();
+      for (Method m : methods) {
+        if (m.getName().equals(methodName)) {
+          m.setAccessible(true);
+          methodInstances.put(methodName, m);
+          return m;
+        }
+      }
+      return null;
+    }
+
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
      */
@@ -990,9 +996,9 @@ public class HBaseClient {
         if (status == Status.SUCCESS) {
           Message rpcResponseType;
           try {
-            rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
-                ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
-                    call.param.getMethodName()));
+            rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+                getMethod(remoteId.getProtocol(),
+                          call.param.getMethodName()));
           } catch (Exception e) {
             throw new RuntimeException(e); //local exception
           }
@@ -1270,7 +1276,7 @@ public class HBaseClient {
    * Throws exceptions if there are network problems or if the remote code
    * threw an exception. */
   public Message call(RpcRequestBody param, InetSocketAddress addr,
-                       Class<? extends VersionedProtocol> protocol,
+                       Class<? extends IpcProtocol> protocol,
                        User ticket, int rpcTimeout)
       throws InterruptedException, IOException {
     Call call = new Call(param);
@@ -1317,7 +1323,6 @@ public class HBaseClient {
    * @param exception the relevant exception
    * @return an exception to throw
    */
-  @SuppressWarnings({"ThrowableInstanceNeverThrown"})
   protected IOException wrapException(InetSocketAddress addr,
                                          IOException exception) {
     if (exception instanceof ConnectException) {
@@ -1340,25 +1345,9 @@ public class HBaseClient {
   /** Makes a set of calls in parallel.  Each parameter is sent to the
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
-   * contains nulls for calls that timed out or errored.
-   * @param params RpcRequestBody parameters
-   * @param addresses socket addresses
-   * @return  RpcResponseBody[]
-   * @throws IOException e
-   * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
-   */
-  @Deprecated
-  public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
-    throws IOException, InterruptedException {
-    return call(params, addresses, null, null);
-  }
-
-  /** Makes a set of calls in parallel.  Each parameter is sent to the
-   * corresponding address.  When all values are available, or have timed out
-   * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
   public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
-                         Class<? extends VersionedProtocol> protocol,
+                         Class<? extends IpcProtocol> protocol,
                          User ticket)
       throws IOException, InterruptedException {
     if (addresses.length == 0) return new RpcResponseBody[0];
@@ -1393,7 +1382,7 @@ public class HBaseClient {
   /* Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   protected Connection getConnection(InetSocketAddress addr,
-                                   Class<? extends VersionedProtocol> protocol,
+                                   Class<? extends IpcProtocol> protocol,
                                    User ticket,
                                    int rpcTimeout,
                                    Call call)
@@ -1436,11 +1425,10 @@ public class HBaseClient {
     final InetSocketAddress address;
     final User ticket;
     final int rpcTimeout;
-    Class<? extends VersionedProtocol> protocol;
+    Class<? extends IpcProtocol> protocol;
     private static final int PRIME = 16777619;
 
-    ConnectionId(InetSocketAddress address,
-        Class<? extends VersionedProtocol> protocol,
+    ConnectionId(InetSocketAddress address, Class<? extends IpcProtocol> protocol,
         User ticket,
         int rpcTimeout) {
       this.protocol = protocol;
@@ -1453,7 +1441,7 @@ public class HBaseClient {
       return address;
     }
 
-    Class<? extends VersionedProtocol> getProtocol() {
+    Class<? extends IpcProtocol> getProtocol() {
       return protocol;
     }
 
@@ -1479,4 +1467,4 @@ public class HBaseClient {
              (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Thu Feb 14 12:58:12 2013
@@ -46,18 +46,17 @@ import java.nio.channels.WritableByteCha
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -68,28 +67,29 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -97,20 +97,20 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-
 import org.cliffc.high_scale_lib.Counter;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceInfo;
 import org.cloudera.htrace.impl.NullSpan;
-import org.cloudera.htrace.Trace;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+// Uses Writables doing sasl
 
 /** A client for an IPC service.  IPC calls take a single Protobuf message as a
  * parameter, and return a single Protobuf message as their value.  A service runs on
@@ -169,22 +169,18 @@ public abstract class HBaseServer implem
     new ThreadLocal<RpcServer>();
   private volatile boolean started = false;
 
-  // For generated protocol classes which doesn't have VERSION field
-  private static final Map<Class<?>, Long>
-    PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
-
-  private static final Map<String, Class<? extends VersionedProtocol>>
-      PROTOCOL_CACHE =
-      new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
+  private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
+      new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
 
-  static Class<? extends VersionedProtocol> getProtocolClass(
+  @SuppressWarnings("unchecked")
+  static Class<? extends IpcProtocol> getProtocolClass(
       String protocolName, Configuration conf)
   throws ClassNotFoundException {
-    Class<? extends VersionedProtocol> protocol =
+    Class<? extends IpcProtocol> protocol =
         PROTOCOL_CACHE.get(protocolName);
 
     if (protocol == null) {
-      protocol = (Class<? extends VersionedProtocol>)
+      protocol = (Class<? extends IpcProtocol>)
           conf.getClassByName(protocolName);
       PROTOCOL_CACHE.put(protocolName, protocol);
     }
@@ -192,7 +188,7 @@ public abstract class HBaseServer implem
   }
 
   /** Returns the server instance called under or null.  May be called under
-   * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+   * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
    * and under protobuf methods of parameters and return values.
    * Permits applications to access the server context.
    * @return HBaseServer
@@ -263,8 +259,6 @@ public abstract class HBaseServer implem
 
   protected int highPriorityLevel;  // what level a high priority call is at
 
-  private volatile int responseQueueLen; // size of response queue for this server
-
   protected final List<Connection> connectionList =
     Collections.synchronizedList(new LinkedList<Connection>());
   //maintain a list
@@ -278,7 +272,7 @@ public abstract class HBaseServer implem
   protected BlockingQueue<Call> replicationQueue;
   private int numOfReplicationHandlers = 0;
   private Handler[] replicationHandlers = null;
-  
+
   protected HBaseRPCErrorHandler errorHandler = null;
 
   /**
@@ -358,7 +352,7 @@ public abstract class HBaseServer implem
       if (errorClass != null) {
         this.isError = true;
       }
- 
+
       ByteBufferOutputStream buf = null;
       if (value != null) {
         buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
@@ -460,7 +454,7 @@ public abstract class HBaseServer implem
     public synchronized boolean isReturnValueDelayed() {
       return this.delayReturnValue;
     }
-    
+
     @Override
     public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
       if (!connection.channel.isOpen()) {
@@ -1000,7 +994,6 @@ public abstract class HBaseServer implem
             return true;
           }
           if (!call.response.hasRemaining()) {
-            responseQueueLen--;
             call.connection.decRpcCount();
             //noinspection RedundantIfStatement
             if (numElements == 1) {    // last call fully processes.
@@ -1070,7 +1063,6 @@ public abstract class HBaseServer implem
     void doRespond(Call call) throws IOException {
       // set the serve time when the response has to be sent later
       call.timestamp = System.currentTimeMillis();
-      responseQueueLen++;
 
       boolean doRegister = false;
       synchronized (call.connection.responseQueue) {
@@ -1120,7 +1112,7 @@ public abstract class HBaseServer implem
     protected String hostAddress;
     protected int remotePort;
     ConnectionHeader header;
-    Class<? extends VersionedProtocol> protocol;
+    Class<? extends IpcProtocol> protocol;
     protected UserGroupInformation user = null;
     private AuthMethod authMethod;
     private boolean saslContextEstablished;
@@ -1324,7 +1316,7 @@ public abstract class HBaseServer implem
             LOG.debug("SASL server context established. Authenticated client: "
               + user + ". Negotiated QoP is "
               + saslServer.getNegotiatedProperty(Sasl.QOP));
-          }          
+          }
           metrics.authenticationSuccess();
           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
           saslContextEstablished = true;
@@ -1437,7 +1429,7 @@ public abstract class HBaseServer implem
             }
           }
           if (dataLength < 0) {
-            throw new IllegalArgumentException("Unexpected data length " 
+            throw new IllegalArgumentException("Unexpected data length "
                 + dataLength + "!! from " + getHostAddress());
           }
           data = ByteBuffer.allocate(dataLength);
@@ -1758,7 +1750,7 @@ public abstract class HBaseServer implem
           status.pause("Waiting for a call");
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
           status.setStatus("Setting up call");
-          status.setConnection(call.connection.getHostAddress(), 
+          status.setConnection(call.connection.getHostAddress(),
               call.connection.getRemotePort());
 
           if (LOG.isDebugEnabled())
@@ -2019,11 +2011,12 @@ public abstract class HBaseServer implem
     }
     return handlers;
   }
-  
+
   public SecretManager<? extends TokenIdentifier> getSecretManager() {
     return this.secretManager;
   }
 
+  @SuppressWarnings("unchecked")
   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
   }
@@ -2051,7 +2044,7 @@ public abstract class HBaseServer implem
       }
     }
   }
-  
+
   /** Wait for the server to be stopped.
    * Does not wait for all subthreads to finish.
    *  See {@link #stop()}.
@@ -2110,7 +2103,7 @@ public abstract class HBaseServer implem
                                          connection.getProtocol());
       }
       authManager.authorize(user != null ? user : null,
-          protocol, getConf(), addr);
+        protocol, getConf(), addr);
     }
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java Thu Feb 14 12:58:12 2013
@@ -18,26 +18,20 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 
-import java.io.IOException;
-
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
 
 /**
  * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Thu Feb 14 12:58:12 2013
@@ -19,14 +19,11 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 
 @InterfaceAudience.Private
 public class MetricsHBaseServer {
-  private static Log LOG = LogFactory.getLog(MetricsHBaseServer.class);
   private MetricsHBaseServerSource source;
 
   public MetricsHBaseServer(String serverName, MetricsHBaseServerWrapper wrapper) {
@@ -69,4 +66,4 @@ public class MetricsHBaseServer {
   public MetricsHBaseServerSource getMetricsSource() {
     return source;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java Thu Feb 14 12:58:12 2013
@@ -21,6 +21,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.security.User;
 
 import java.net.InetAddress;
@@ -90,7 +91,7 @@ public class RequestContext {
    */
   public static void set(User user,
       InetAddress remoteAddress,
-      Class<? extends VersionedProtocol> protocol) {
+      Class<? extends IpcProtocol> protocol) {
     RequestContext ctx = instance.get();
     ctx.user = user;
     ctx.remoteAddress = remoteAddress;
@@ -111,12 +112,12 @@ public class RequestContext {
 
   private User user;
   private InetAddress remoteAddress;
-  private Class<? extends VersionedProtocol> protocol;
+  private Class<? extends IpcProtocol> protocol;
   // indicates we're within a RPC request invocation
   private boolean inRequest;
 
   private RequestContext(User user, InetAddress remoteAddr,
-      Class<? extends VersionedProtocol> protocol) {
+      Class<? extends IpcProtocol> protocol) {
     this.user = user;
     this.remoteAddress = remoteAddr;
     this.protocol = protocol;
@@ -130,11 +131,11 @@ public class RequestContext {
     return remoteAddress;
   }
 
-  public Class<? extends VersionedProtocol> getProtocol() {
+  public Class<? extends IpcProtocol> getProtocol() {
     return protocol;
   }
 
   public boolean isInRequest() {
     return inRequest;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Thu Feb 14 12:58:12 2013
@@ -23,16 +23,18 @@ import com.google.common.base.Function;
 import com.google.protobuf.Message;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-/**
- */
 @InterfaceAudience.Private
 public interface RpcServer {
+  // TODO: Needs cleanup.  Why a 'start', and then a 'startThreads' and an 'openServer'?
+  // Also, the call takes a RpcRequestBody, an already composed combination of
+  // rpc Request and metadata.  Should disentangle metadata and rpc Request Message.
 
   void setSocketSendBufSize(int size);
 
@@ -45,12 +47,12 @@ public interface RpcServer {
   InetSocketAddress getListenerAddress();
 
   /** Called for each call.
-   * @param param writable parameter
+   * @param param parameter
    * @param receiveTime time
-   * @return Message
+   * @return Message Protobuf response Message
    * @throws java.io.IOException e
    */
-  Message call(Class<? extends VersionedProtocol> protocol,
+  Message call(Class<? extends IpcProtocol> protocol,
       RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
       throws IOException;
 
@@ -62,9 +64,8 @@ public interface RpcServer {
 
   void startThreads();
 
-
   /**
    * Returns the metrics instance for reporting RPC call statistics
    */
   MetricsHBaseServer getMetrics();
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java Thu Feb 14 12:58:12 2013
@@ -121,4 +121,13 @@ public class ServerRpcController impleme
   public boolean failedOnException() {
     return serviceException != null;
   }
+
+  /**
+   * Throws an IOException back out if one is currently stored.
+   */
+  public void checkFailed() throws IOException {
+    if (failedOnException()) {
+      throw getFailedOn();
+    }
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java Thu Feb 14 12:58:12 2013
@@ -106,7 +106,20 @@ public class TableSplit implements Input
       Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
   }
 
+  @Override
   public int compareTo(TableSplit o) {
     return Bytes.compareTo(getStartRow(), o.getStartRow());
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof TableSplit)) {
+      return false;
+    }
+    TableSplit other = (TableSplit)o;
+    return Bytes.equals(m_tableName, other.m_tableName) &&
+      Bytes.equals(m_startRow, other.m_startRow) &&
+      Bytes.equals(m_endRow, other.m_endRow) &&
+      m_regionLocation.equals(other.m_regionLocation);
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Thu Feb 14 12:58:12 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
 * Export an HBase table.
@@ -137,10 +138,10 @@ public class Export {
 
     int batching = conf.getInt(EXPORT_BATCHING, -1);
     if (batching !=  -1){
-      try{
+      try {
         s.setBatch(batching);
-	} catch (RuntimeException e) {
-	    LOG.error("Batching could not be set", e);
+      } catch (IncompatibleFilterException e) {
+        LOG.error("Batching could not be set", e);
       }
     }
     LOG.info("versions=" + versions + ", starttime=" + startTime +

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Feb 14 12:58:12 2013
@@ -49,17 +49,18 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -82,7 +83,7 @@ import org.apache.hadoop.mapreduce.lib.p
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
   static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
-  TimeRangeTracker trt = new TimeRangeTracker();
+  private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
   private static final String DATABLOCK_ENCODING_CONF_KEY =
      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
@@ -106,6 +107,7 @@ public class HFileOutputFormat extends F
 
     // create a map from column family to the compression algorithm
     final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
 
     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
     final HFileDataBlockEncoder encoder;
@@ -166,7 +168,6 @@ public class HFileOutputFormat extends F
 
         // we now have the proper HLog writer. full steam ahead
         kv.updateLatestStamp(this.now);
-        trt.includeTimestamp(kv);
         wl.writer.append(kv);
         wl.written += length;
 
@@ -187,9 +188,9 @@ public class HFileOutputFormat extends F
         this.rollRequested = false;
       }
 
-      /* Create a new HFile.Writer.
+      /* Create a new StoreFile.Writer.
        * @param family
-       * @return A WriterLength, containing a new HFile.Writer.
+       * @return A WriterLength, containing a new StoreFile.Writer.
        * @throws IOException
        */
       private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +199,28 @@ public class HFileOutputFormat extends F
         Path familydir = new Path(outputdir, Bytes.toString(family));
         String compression = compressionMap.get(family);
         compression = compression == null ? defaultCompression : compression;
-        wl.writer = HFile.getWriterFactoryNoCache(conf)
-            .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
-            .withBlockSize(blocksize)
-            .withCompression(compression)
-            .withComparator(KeyValue.KEY_COMPARATOR)
+        String bloomTypeStr = bloomTypeMap.get(family);
+        BloomType bloomType = BloomType.NONE;
+        if (bloomTypeStr != null) {
+          bloomType = BloomType.valueOf(bloomTypeStr);
+        }
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+            .withOutputDir(familydir)
+            .withCompression(AbstractHFileWriter.compressionByName(compression))
+            .withBloomType(bloomType)
+            .withComparator(KeyValue.COMPARATOR)
             .withDataBlockEncoder(encoder)
             .withChecksumType(HStore.getChecksumType(conf))
             .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
-            .create();
+            .build();
+        
         this.writers.put(family, wl);
         return wl;
       }
 
-      private void close(final HFile.Writer w) throws IOException {
+      private void close(final StoreFile.Writer w) throws IOException {
         if (w != null) {
           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +230,7 @@ public class HFileOutputFormat extends F
               Bytes.toBytes(true));
           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
               Bytes.toBytes(compactionExclude));
-          w.appendFileInfo(StoreFile.TIMERANGE_KEY,
-              WritableUtils.toByteArray(trt));
+          w.appendTrackedTimestampsToMetadata();
           w.close();
         }
       }
@@ -241,7 +249,7 @@ public class HFileOutputFormat extends F
    */
   static class WriterLength {
     long written = 0;
-    HFile.Writer writer = null;
+    StoreFile.Writer writer = null;
   }
 
   /**
@@ -359,7 +367,8 @@ public class HFileOutputFormat extends F
 
     // Set compression algorithms based on column families
     configureCompression(table, conf);
-
+    configureBloomType(table, conf);
+    
     TableMapReduceUtil.addDependencyJars(job);
     LOG.info("Incremental table output configured.");
   }
@@ -375,25 +384,39 @@ public class HFileOutputFormat extends F
    *         algorithm
    */
   static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
-    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
-    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
-    for (String familyConf : compressionConf.split("&")) {
+    return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+  }
+
+  private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+  }
+  
+  /**
+   * Run inside the task to deserialize column family to given conf value map.
+   * 
+   * @param conf
+   * @param confName
+   * @return a map of column family to the given configuration value
+   */
+  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+    Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    String confVal = conf.get(confName, "");
+    for (String familyConf : confVal.split("&")) {
       String[] familySplit = familyConf.split("=");
       if (familySplit.length != 2) {
         continue;
       }
-
       try {
-        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
             URLDecoder.decode(familySplit[1], "UTF-8"));
       } catch (UnsupportedEncodingException e) {
         // will not happen with UTF-8 encoding
         throw new AssertionError(e);
       }
     }
-    return compressionMap;
+    return confValMap;
   }
-
+  
   /**
    * Serialize column family to compression algorithm map to configuration.
    * Invoked while configuring the MR job for incremental load.
@@ -403,6 +426,8 @@ public class HFileOutputFormat extends F
    * @throws IOException
    *           on failure to read column family descriptors
    */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
   static void configureCompression(HTable table, Configuration conf) throws IOException {
     StringBuilder compressionConfigValue = new StringBuilder();
     HTableDescriptor tableDescriptor = table.getTableDescriptor();
@@ -423,4 +448,35 @@ public class HFileOutputFormat extends F
     // Get rid of the last ampersand
     conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
   }
+  
+  /**
+   * Serialize column family to bloom type map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  static void configureBloomType(HTable table, Configuration conf) throws IOException {
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if (tableDescriptor == null) {
+      // could happen with mock table instance
+      return;
+    }
+    StringBuilder bloomTypeConfigValue = new StringBuilder();
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        bloomTypeConfigValue.append('&');
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      bloomTypeConfigValue.append('=');
+      String bloomType = familyDescriptor.getBloomFilterType().toString();
+      if (bloomType == null) {
+        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    }
+    conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java Thu Feb 14 12:58:12 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,10 +48,10 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 @InterfaceAudience.Public
 public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
-  private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
+  private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
 
-  public static String START_TIME_KEY = "hlog.start.time";
-  public static String END_TIME_KEY = "hlog.end.time";
+  public static final String START_TIME_KEY = "hlog.start.time";
+  public static final String END_TIME_KEY = "hlog.end.time";
 
   /**
    * {@link InputSplit} for {@link HLog} files. Each split represent

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Thu Feb 14 12:58:12 2013
@@ -19,27 +19,41 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Import data written by {@link Export}.
@@ -47,9 +61,15 @@ import org.apache.hadoop.util.GenericOpt
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Import {
+  private static final Log LOG = LogFactory.getLog(Import.class);
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+  // Optional filter to use for mappers
+  private static Filter filter;
 
   /**
    * A mapper that just writes out KeyValues.
@@ -72,6 +92,10 @@ public class Import {
     throws IOException {
       try {
         for (KeyValue kv : value.raw()) {
+          kv = filterKv(kv);
+          // skip if we filtered it out
+          if (kv == null) continue;
+
           context.write(row, convertKv(kv, cfRenameMap));
         }
       } catch (InterruptedException e) {
@@ -82,6 +106,7 @@ public class Import {
     @Override
     public void setup(Context context) {
       cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
     }
   }
 
@@ -91,6 +116,7 @@ public class Import {
   static class Importer
   extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
+    private UUID clusterId;
       
     /**
      * @param row  The current table row key.
@@ -116,6 +142,10 @@ public class Import {
       Put put = null;
       Delete delete = null;
       for (KeyValue kv : result.raw()) {
+        kv = filterKv(kv);
+        // skip if we filter it out
+        if (kv == null) continue;
+
         kv = convertKv(kv, cfRenameMap);
         // Deletes and Puts are gathered and written when finished
         if (kv.isDelete()) {
@@ -131,17 +161,106 @@ public class Import {
         }
       }
       if (put != null) {
+        put.setClusterId(clusterId);
         context.write(key, put);
       }
       if (delete != null) {
+        delete.setClusterId(clusterId);
         context.write(key, delete);
       }
     }
 
     @Override
     public void setup(Context context) {
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      Configuration conf = context.getConfiguration();
+      cfRenameMap = createCfRenameMap(conf);
+      filter = instantiateFilter(conf);
+
+      try {
+        HConnection connection = HConnectionManager.getConnection(conf);
+        ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+        ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+        clusterId = zkHelper.getUUIDForCluster(zkw);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.error("Problem connecting to ZooKeper during task setup", e);
+      } catch (KeeperException e) {
+        LOG.error("Problem reading ZooKeeper data during task setup", e);
+      } catch (IOException e) {
+        LOG.error("Problem setting up task", e);
+      }
+
+    }
+  }
+
+  /**
+   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+   * optionally not include in the job output
+   * @param conf {@link Configuration} from which to load the filter
+   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+   * @throws IllegalArgumentException if the filter is misconfigured
+   */
+  private static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured
+    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+    if (filterClass == null) {
+      LOG.debug("No configured filter class, accepting all keyvalues.");
+      return null;
+    }
+    LOG.debug("Attempting to create filter:" + filterClass);
+
+    try {
+      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+      return (Filter) m.invoke(null, getFilterArgs(conf));
+    } catch (IllegalAccessException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+    ArrayList<byte[]> args = new ArrayList<byte[]>();
+    String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    for (String arg : sargs) {
+      // all the filters' instantiation methods expected quoted args since they are coming from
+      // the shell, so add them here, though its shouldn't really be needed :-/
+      args.add(Bytes.toBytes("'" + arg + "'"));
+    }
+    return args;
+  }
+
+  /**
+   * Attempt to filter out the keyvalue
+   * @param kv {@link KeyValue} on which to apply the filter
+   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+   *         {@link KeyValue}
+   */
+  private static KeyValue filterKv(KeyValue kv) {
+    // apply the filter and skip this kv if the filter doesn't apply
+    if (filter != null) {
+      Filter.ReturnCode code = filter.filterKeyValue(kv);
+      System.out.println("Filter returned:" + code);
+      // if its not an accept type, then skip this kv
+      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+        if (LOG.isDebugEnabled()) {
+          System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+        }
+        return null;
+      }
     }
+    return kv;
   }
 
   // helper: create a new KeyValue based on CF rename map
@@ -223,13 +342,33 @@ public class Import {
     }
     conf.set(CF_RENAME_PROP, sb.toString());
   }
-  
+
+  /**
+   * Add a Filter to be instantiated on import
+   * @param conf Configuration to update (will be passed to the job)
+   * @param clazz {@link Filter} subclass to instantiate on the server.
+   * @param args List of arguments to pass to the filter on instantiation
+   */
+  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+      List<String> args) {
+    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+    // build the param string for the key
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < args.size(); i++) {
+      String arg = args.get(i);
+      builder.append(arg);
+      if (i != args.size() - 1) {
+        builder.append(",");
+      }
+    }
+    conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+  }
 
   /**
    * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
+   * @param conf The current configuration.
+   * @param args The command line parameters.
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
@@ -242,6 +381,17 @@ public class Import {
     FileInputFormat.setInputPaths(job, inputDir);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+    // make sure we get the filter in the jars
+    try {
+      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+      if (filter != null) {
+        TableMapReduceUtil.addDependencyJars(conf, filter);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
     if (hfileOutPath != null) {
       job.setMapperClass(KeyValueImporter.class);
       HTable table = new HTable(conf, tableName);
@@ -274,6 +424,15 @@ public class Import {
     System.err.println("By default Import will load data directly into HBase. To instead generate");
     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err
+        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+        + CF_RENAME_PROP + " property. Futher, filters will only use the"
+        + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+        + "the KeyValue.");
     System.err.println("For performance consider the following options:\n"
         + "  -Dmapred.map.tasks.speculative.execution=false\n"
         + "  -Dmapred.reduce.tasks.speculative.execution=false");

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Feb 14 12:58:12 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.HB
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -71,11 +72,13 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -91,19 +94,30 @@ import com.google.common.util.concurrent
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class LoadIncrementalHFiles extends Configured implements Tool {
-  private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
-  static AtomicLong regionCount = new AtomicLong(0);
+  private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  static final AtomicLong regionCount = new AtomicLong(0);
   private HBaseAdmin hbAdmin;
   private Configuration cfg;
 
-  public static String NAME = "completebulkload";
-  private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+  public static final String NAME = "completebulkload";
+  private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   private boolean assignSeqIds;
 
-  public LoadIncrementalHFiles(Configuration conf) throws Exception {
+  private boolean useSecure;
+  private Token<?> userToken;
+  private String bulkToken;
+
+  //package private for testing
+  LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
     super(conf);
     this.cfg = conf;
     this.hbAdmin = new HBaseAdmin(conf);
+    //added simple for testing
+    this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
+  }
+
+  public LoadIncrementalHFiles(Configuration conf) throws Exception {
+    this(conf, null);
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
   }
 
@@ -215,6 +229,18 @@ public class LoadIncrementalHFiles exten
         return;
       }
 
+      //If using secure bulk load
+      //prepare staging directory and token
+      if(useSecure) {
+        FileSystem fs = FileSystem.get(cfg);
+        //This condition is here for unit testing
+        //Since delegation token doesn't work in mini cluster
+        if(User.isSecurityEnabled()) {
+         userToken = fs.getDelegationToken("renewer");
+        }
+        bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
+      }
+
       // Assumes that region splits can happen while this occurs.
       while (!queue.isEmpty()) {
         // need to reload split keys each iteration.
@@ -243,6 +269,18 @@ public class LoadIncrementalHFiles exten
       }
 
     } finally {
+      if(useSecure) {
+        if(userToken != null) {
+          try {
+            userToken.cancel(cfg);
+          } catch (Exception e) {
+            LOG.warn("Failed to cancel HDFS delegation token.", e);
+          }
+        }
+        if(bulkToken != null) {
+          new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+        }
+      }
       pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
         StringBuilder err = new StringBuilder();
@@ -476,11 +514,47 @@ public class LoadIncrementalHFiles exten
         tableName, first) {
       @Override
       public Boolean call() throws Exception {
-        LOG.debug("Going to connect to server " + location + " for row "
-            + Bytes.toStringBinary(row));
-        byte[] regionName = location.getRegionInfo().getRegionName();
-        return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
-            assignSeqIds);
+        SecureBulkLoadClient secureClient = null;
+        boolean success = false;
+
+        try {
+          LOG.debug("Going to connect to server " + location + " for row "
+              + Bytes.toStringBinary(row));
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          if(!useSecure) {
+            success = ProtobufUtil.bulkLoadHFile(server, famPaths, regionName, assignSeqIds);
+          } else {
+            HTable table = new HTable(conn.getConfiguration(), tableName);
+            secureClient = new SecureBulkLoadClient(table);
+            success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
+          }
+          return success;
+        } finally {
+          //Best effort copying of files that might not have been imported
+          //from the staging directory back to original location
+          //in user directory
+          if(secureClient != null && !success) {
+            FileSystem fs = FileSystem.get(cfg);
+            for(Pair<byte[], String> el : famPaths) {
+              Path hfileStagingPath = null;
+              Path hfileOrigPath = new Path(el.getSecond());
+              try {
+                hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+                    hfileOrigPath.getName());
+                if(fs.rename(hfileStagingPath, hfileOrigPath)) {
+                  LOG.debug("Moved back file " + hfileOrigPath + " from " +
+                      hfileStagingPath);
+                } else if(fs.exists(hfileStagingPath)){
+                  LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                      hfileStagingPath);
+                }
+              } catch(Exception ex) {
+                LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                    hfileStagingPath, ex);
+              }
+            }
+          }
+        }
       }
     };
 
@@ -626,11 +700,11 @@ public class LoadIncrementalHFiles exten
     }
 
     HTableDescriptor htd = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd = null;
+    HColumnDescriptor hcd;
 
     // Add column families
     // Build a set of keys
-    byte[][] keys = null;
+    byte[][] keys;
     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     
     for (FileStatus stat : familyDirStatuses) {
@@ -667,10 +741,10 @@ public class LoadIncrementalHFiles exten
             " last="  + Bytes.toStringBinary(last));
           
           // To eventually infer start key-end key boundaries
-          Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+          Integer value = map.containsKey(first)? map.get(first):0;
           map.put(first, value+1);
 
-          value = map.containsKey(last)?(Integer)map.get(last):0;
+          value = map.containsKey(last)? map.get(last):0;
           map.put(last, value-1);
         }  finally {
           reader.close();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Thu Feb 14 12:58:12 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.Immuta
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -219,7 +220,8 @@ extends InputFormat<ImmutableBytesWritab
   private String reverseDNS(InetAddress ipAddress) throws NamingException {
     String hostName = this.reverseDNSCacheMap.get(ipAddress);
     if (hostName == null) {
-      hostName = DNS.reverseDns(ipAddress, this.nameServer);
+      hostName = Strings.domainNamePointerToHostName(
+        DNS.reverseDns(ipAddress, this.nameServer));
       this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
     return hostName;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Thu Feb 14 12:58:12 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
           Delete del = null;
           KeyValue lastKV = null;
           for (KeyValue kv : value.getKeyValues()) {
-            // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+            // filtering HLog meta entries
             if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
 
             // A WALEdit may contain multiple operations (HBASE-3584) and/or

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Feb 14 12:58:12 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.collect.LinkedHashMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.collect.LinkedHashMultimap;
+
 /**
  * Manages and performs region assignment.
  * <p>
@@ -162,8 +162,10 @@ public class AssignmentManager extends Z
    * that ServerShutdownHandler can be fully enabled and re-assign regions
    * of dead servers. So that when re-assignment happens, AssignmentManager
    * has proper region states.
+   *
+   * Protected to ease testing.
    */
-  final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+  protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
 
   /**
    * Constructs a new assignment manager.
@@ -610,7 +612,7 @@ public class AssignmentManager extends Z
    */
   private void handleRegion(final RegionTransition rt, int expectedVersion) {
     if (rt == null) {
-      LOG.warn("Unexpected NULL input " + rt);
+      LOG.warn("Unexpected NULL input for RegionTransition rt");
       return;
     }
     final ServerName sn = rt.getServerName();
@@ -1059,13 +1061,27 @@ public class AssignmentManager extends Z
               ZKUtil.listChildrenAndWatchForNewChildren(
                 watcher, watcher.assignmentZNode);
             if (children != null) {
+              Stat stat = new Stat();
               for (String child : children) {
                 // if region is in transition, we already have a watch
                 // on it, so no need to watch it again. So, as I know for now,
                 // this is needed to watch splitting nodes only.
                 if (!regionStates.isRegionInTransition(child)) {
-                  ZKUtil.watchAndCheckExists(watcher,
-                    ZKUtil.joinZNode(watcher.assignmentZNode, child));
+                  stat.setVersion(0);
+                  byte[] data = ZKAssign.getDataAndWatch(watcher,
+                    ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
+                  if (data != null && stat.getVersion() > 0) {
+                    try {
+                      RegionTransition rt = RegionTransition.parseFrom(data);
+
+                      //See HBASE-7551, handle splitting too, in case we miss the node change event
+                      if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
+                        handleRegion(rt, stat.getVersion());
+                      }
+                    } catch (DeserializationException de) {
+                      LOG.error("error getting data for " + child, de);
+                    }
+                  }
                 }
               }
             }
@@ -1461,6 +1477,7 @@ public class AssignmentManager extends Z
           return;
         }
         // This never happens. Currently regionserver close always return true.
+        // Todo; this can now happen (0.96) if there is an exception in a coprocessor
         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
           region.getRegionNameAsString());
       } catch (Throwable t) {
@@ -2633,11 +2650,11 @@ public class AssignmentManager extends Z
     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
   }
 
-  boolean isCarryingRoot(ServerName serverName) {
+  public boolean isCarryingRoot(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
   }
 
-  boolean isCarryingMeta(ServerName serverName) {
+  public boolean isCarryingMeta(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Thu Feb 14 12:58:12 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ServerNam
 
 /**
  * Run bulk assign.  Does one RCP per regionserver passing a
- * batch of regions using {@link SingleServerBulkAssigner}.
+ * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
  */
 @InterfaceAudience.Private
 public class GeneralBulkAssigner extends BulkAssigner {