You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2012/07/13 22:34:49 UTC

svn commit: r1361382 [14/16] - in /accumulo/branches/ACCUMULO-259: ./ bin/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/ conf/examples/3GB/native-standalone/ conf/e...

Added: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java?rev=1361382&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java (added)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java Fri Jul 13 20:34:44 2012
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * 
+ */
+public class ZKPermHandler implements PermissionHandler {
+  private static final Logger log = Logger.getLogger(ZKAuthorizor.class);
+  private static PermissionHandler zkPermHandlerInstance = null;
+  
+  private String ZKUserPath;
+  private final ZooCache zooCache;
+  private final String ZKUserSysPerms = "/System";
+  private final String ZKUserTablePerms = "/Tables";
+
+  public static synchronized PermissionHandler getInstance() {
+    if (zkPermHandlerInstance == null)
+      zkPermHandlerInstance = new ZKPermHandler();
+    return zkPermHandlerInstance;
+  }
+
+  public void initialize(String instanceId) {
+    ZKUserPath = ZKSecurityTool.getInstancePath(instanceId) + "/users";
+  }
+
+  public ZKPermHandler() {
+    zooCache = new ZooCache();
+  }
+
+  @Override
+  public boolean hasTablePermission(String user, String table, TablePermission permission) {
+    byte[] serializedPerms;
+    try {
+      serializedPerms = ZooReaderWriter.getRetryingInstance().getData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, null);
+    } catch (KeeperException e) {
+      if (e.code() == Code.NONODE) {
+        return false;
+      }
+      log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+      return false;
+    } catch (InterruptedException e) {
+      log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+      return false;
+    }
+    if (serializedPerms != null) {
+      return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+    }
+    return false;
+  }
+  
+  @Override
+  public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+    byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+    if (serializedPerms != null) {
+      return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+    }
+    return false;
+  }
+
+  @Override
+  public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+    try {
+      byte[] permBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+      Set<SystemPermission> perms;
+      if (permBytes == null) {
+        perms = new TreeSet<SystemPermission>();
+      } else {
+        perms = ZKSecurityTool.convertSystemPermissions(permBytes);
+      }
+      
+      if (perms.add(permission)) {
+        synchronized (zooCache) {
+          zooCache.clear();
+          ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
+              NodeExistsPolicy.OVERWRITE);
+        }
+      }
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+    Set<TablePermission> tablePerms;
+    byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+    if (serializedPerms != null)
+      tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+    else
+      tablePerms = new TreeSet<TablePermission>();
+    
+    try {
+      if (tablePerms.add(permission)) {
+        synchronized (zooCache) {
+          zooCache.clear(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+          IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+          zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+              NodeExistsPolicy.OVERWRITE);
+        }
+      }
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+    byte[] sysPermBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+    
+    // User had no system permission, nothing to revoke.
+    if (sysPermBytes == null)
+      return;
+    
+    Set<SystemPermission> sysPerms = ZKSecurityTool.convertSystemPermissions(sysPermBytes);
+    
+    try {
+      if (sysPerms.remove(permission)) {
+        synchronized (zooCache) {
+          zooCache.clear();
+          ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
+              NodeExistsPolicy.OVERWRITE);
+        }
+      }
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+    byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+    
+    // User had no table permission, nothing to revoke.
+    if (serializedPerms == null)
+      return;
+    
+    Set<TablePermission> tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+    try {
+      if (tablePerms.remove(permission)) {
+        zooCache.clear();
+        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        if (tablePerms.size() == 0)
+          zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+        else
+          zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+              NodeExistsPolicy.OVERWRITE);
+      }
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void cleanTablePermissions(String table) throws AccumuloSecurityException {
+    try {
+      synchronized (zooCache) {
+        zooCache.clear();
+        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        for (String user : zooCache.getChildren(ZKUserPath))
+          zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+      }
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException("unknownUser", SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void initializeSecurity(String rootuser) throws AccumuloSecurityException {
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    
+    // create the root user with all system privileges, no table privileges, and no record-level authorizations
+    Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
+    for (SystemPermission p : SystemPermission.values())
+      rootPerms.add(p);
+    Map<String,Set<TablePermission>> tablePerms = new HashMap<String,Set<TablePermission>>();
+    // Allow the root user to flush the !METADATA table
+    tablePerms.put(Constants.METADATA_TABLE_ID, Collections.singleton(TablePermission.ALTER_TABLE));
+    
+    try {
+      initUser(rootuser);
+      zoo.putPersistentData(ZKUserPath + "/" + rootuser + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(rootPerms), NodeExistsPolicy.FAIL);
+      for (Entry<String,Set<TablePermission>> entry : tablePerms.entrySet())
+        createTablePerm(rootuser, entry.getKey(), entry.getValue());
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+  
+  /**
+   * @param user
+   * @throws AccumuloSecurityException
+   */
+  public void initUser(String user) throws AccumuloSecurityException {
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    try {
+      zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
+      zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms, new byte[0], NodeExistsPolicy.SKIP);
+    } catch (KeeperException e) {
+      log.error(e, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sets up a new table configuration for the provided user/table. No checking for existence is done here, it should be done before calling.
+   */
+  private void createTablePerm(String user, String table, Set<TablePermission> perms) throws KeeperException, InterruptedException {
+    synchronized (zooCache) {
+      zooCache.clear();
+      ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
+          ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL);
+    }
+  }
+  
+  @Override
+  public void dropUser(String user) throws AccumuloSecurityException {
+    try {
+      synchronized (zooCache) {
+        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP);
+        zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP);
+        zooCache.clear(ZKUserPath + "/" + user);
+      }
+    } catch (InterruptedException e) {
+      log.error(e, e);
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      log.error(e, e);
+      if (e.code().equals(KeeperException.Code.NONODE))
+        throw new AccumuloSecurityException(user, SecurityErrorCode.USER_DOESNT_EXIST, e);
+      throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+      
+    }
+  }
+  
+  @Override
+  public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+    byte[] perms;
+    try {
+      perms = ZooReaderWriter.getRetryingInstance().getData(ZKUserPath + "/" + user + ZKUserSysPerms, null);
+    } catch (KeeperException e) {
+      if (e.code() == Code.NONODE) {
+        return false;
+      }
+      log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+      return false;
+    } catch (InterruptedException e) {
+      log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+      return false;
+    }
+    
+    if (perms == null)
+      return false;
+    return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+  }
+  
+  @Override
+  public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+    byte[] perms = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+    if (perms == null)
+      return false;
+    return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+  }
+  
+  @Override
+  public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+    return true;
+  }
+}

Added: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java?rev=1361382&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java (added)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java Fri Jul 13 20:34:44 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.log4j.Logger;
+
+/**
+ * All the static too methods used for this class, so that we can separate out stuff that isn't using ZooKeeper. That way, we can check the synchronization
+ * model more easily, as we only need to check to make sure zooCache is cleared when things are written to ZooKeeper in methods that might use it. These
+ * won't, and so don't need to be checked.
+ */
+class ZKSecurityTool {
+  private static final Logger log = Logger.getLogger(ZKSecurityTool.class);
+  private static final int SALT_LENGTH = 8;
+  
+  // Generates a byte array salt of length SALT_LENGTH
+  private static byte[] generateSalt() {
+    final SecureRandom random = new SecureRandom();
+    byte[] salt = new byte[SALT_LENGTH];
+    random.nextBytes(salt);
+    return salt;
+  }
+  
+  private static byte[] hash(byte[] raw) throws NoSuchAlgorithmException {
+    MessageDigest md = MessageDigest.getInstance(Constants.PW_HASH_ALGORITHM);
+    md.update(raw);
+    return md.digest();
+  }
+  
+  public static boolean checkPass(byte[] password, byte[] zkData) {
+    if (zkData == null)
+      return false;
+    
+    byte[] salt = new byte[SALT_LENGTH];
+    System.arraycopy(zkData, 0, salt, 0, SALT_LENGTH);
+    byte[] passwordToCheck;
+    try {
+      passwordToCheck = convertPass(password, salt);
+    } catch (NoSuchAlgorithmException e) {
+      log.error("Count not create hashed password", e);
+      return false;
+    }
+    return java.util.Arrays.equals(passwordToCheck, zkData);
+  }
+  
+  public static byte[] createPass(byte[] password) throws AccumuloException {
+    byte[] salt = generateSalt();
+    try {
+      return convertPass(password, salt);
+    } catch (NoSuchAlgorithmException e) {
+      log.error("Count not create hashed password", e);
+      throw new AccumuloException("Count not create hashed password", e);
+    }
+  }
+  
+  private static byte[] convertPass(byte[] password, byte[] salt) throws NoSuchAlgorithmException {
+    byte[] plainSalt = new byte[password.length + SALT_LENGTH];
+    System.arraycopy(password, 0, plainSalt, 0, password.length);
+    System.arraycopy(salt, 0, plainSalt, password.length, SALT_LENGTH);
+    byte[] hashed = hash(plainSalt);
+    byte[] saltedHash = new byte[SALT_LENGTH + hashed.length];
+    System.arraycopy(salt, 0, saltedHash, 0, SALT_LENGTH);
+    System.arraycopy(hashed, 0, saltedHash, SALT_LENGTH, hashed.length);
+    return saltedHash; // contains salt+hash(password+salt)
+  }
+  
+  public static Authorizations convertAuthorizations(byte[] authorizations) {
+    return new Authorizations(authorizations);
+  }
+  
+  public static byte[] convertAuthorizations(Authorizations authorizations) {
+    return authorizations.getAuthorizationsArray();
+  }
+
+  public static byte[] convertSystemPermissions(Set<SystemPermission> systempermissions) {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream(systempermissions.size());
+    DataOutputStream out = new DataOutputStream(bytes);
+    try {
+      for (SystemPermission sp : systempermissions)
+        out.writeByte(sp.getId());
+    } catch (IOException e) {
+      log.error(e, e);
+      throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+    }
+    return bytes.toByteArray();
+  }
+  
+  public static Set<SystemPermission> convertSystemPermissions(byte[] systempermissions) {
+    ByteArrayInputStream bytes = new ByteArrayInputStream(systempermissions);
+    DataInputStream in = new DataInputStream(bytes);
+    Set<SystemPermission> toReturn = new HashSet<SystemPermission>();
+    try {
+      while (in.available() > 0)
+        toReturn.add(SystemPermission.getPermissionById(in.readByte()));
+    } catch (IOException e) {
+      log.error("User database is corrupt; error converting system permissions", e);
+      toReturn.clear();
+    }
+    return toReturn;
+  }
+  
+  public static byte[] convertTablePermissions(Set<TablePermission> tablepermissions) {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream(tablepermissions.size());
+    DataOutputStream out = new DataOutputStream(bytes);
+    try {
+      for (TablePermission tp : tablepermissions)
+        out.writeByte(tp.getId());
+    } catch (IOException e) {
+      log.error(e, e);
+      throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+    }
+    return bytes.toByteArray();
+  }
+  
+  public static Set<TablePermission> convertTablePermissions(byte[] tablepermissions) {
+    Set<TablePermission> toReturn = new HashSet<TablePermission>();
+    for (byte b : tablepermissions)
+      toReturn.add(TablePermission.getPermissionById(b));
+    return toReturn;
+  }
+  
+  /**
+   * @param instanceId
+   * @return
+   */
+  public static String getInstancePath(String instanceId) {
+    return Constants.ZROOT + "/" + instanceId;
+  }
+}
\ No newline at end of file

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Jul 13 20:34:44 2012
@@ -53,6 +53,7 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,6 +116,8 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -124,6 +127,7 @@ import org.apache.accumulo.core.util.Dae
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -145,6 +149,7 @@ import org.apache.accumulo.server.master
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
@@ -184,6 +189,7 @@ import org.apache.accumulo.server.util.T
 import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -2539,6 +2545,8 @@ public class TabletServer extends Abstra
   
   private TServer server;
   
+  private DistributedWorkQueue bulkFailedCopyQ;
+  
   private static final String METRICS_PREFIX = "tserver";
   
   private static ObjectName OBJECT_NAME = null;
@@ -2587,12 +2595,12 @@ public class TabletServer extends Abstra
   }
   
   // Connect to the master for posting asynchronous results
-  private MasterClientService.Iface masterConnection(String address) {
+  private MasterClientService.Client masterConnection(String address) {
     try {
       if (address == null) {
         return null;
       }
-      MasterClientService.Iface client = ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.MASTER_CLIENTPORT,
+      MasterClientService.Client client = ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.MASTER_CLIENTPORT,
           Property.GENERAL_RPC_TIMEOUT, getSystemConfiguration());
       // log.info("Listener API to master has been opened");
       return client;
@@ -2602,14 +2610,14 @@ public class TabletServer extends Abstra
     return null;
   }
   
-  private void returnMasterConnection(MasterClientService.Iface client) {
+  private void returnMasterConnection(MasterClientService.Client client) {
     ThriftUtil.returnClient(client);
   }
   
   private int startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
-    TabletClientService.Iface tch = TraceWrap.service(new ThriftClientHandler());
-    TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+    Iface tch = TraceWrap.service(new ThriftClientHandler());
+    Processor<Iface> processor = new Processor<Iface>(tch);
     int port = startServer(getSystemConfiguration(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
     log.info("port = " + port);
     return port;
@@ -2678,13 +2686,23 @@ public class TabletServer extends Abstra
     }
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
+    
+    ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+
+    bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
+    try {
+      bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
+    } catch (Exception e1) {
+      throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
+    }
+    
     try {
-      logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+      logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
     } catch (Exception ex) {
       log.error("Error setting watches for recoveries");
       throw new RuntimeException(ex);
     }
-    
+
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
       // Do this because interface not in same package.
@@ -2700,7 +2718,7 @@ public class TabletServer extends Abstra
       // send all of the pending messages
       try {
         MasterMessage mm = null;
-        MasterClientService.Iface iface = null;
+        MasterClientService.Client iface = null;
         
         try {
           // wait until a message is ready to send, or a sever stop
@@ -2797,11 +2815,21 @@ public class TabletServer extends Abstra
   private long totalMinorCompactions;
   
   public static SortedMap<KeyExtent,Text> verifyTabletInformation(KeyExtent extent, TServerInstance instance, SortedMap<Key,Value> tabletsKeyValues,
-      String clientAddress, ZooLock lock) throws AccumuloSecurityException {
+      String clientAddress, ZooLock lock) throws AccumuloSecurityException, DistributedStoreException {
     for (int tries = 0; tries < 3; tries++) {
       try {
         log.debug("verifying extent " + extent);
         if (extent.isRootTablet()) {
+          ZooTabletStateStore store = new ZooTabletStateStore();
+          if (!store.iterator().hasNext()) {
+            log.warn("Illegal state: location is not set in zookeeper");
+            return null;
+          }
+          TabletLocationState next = store.iterator().next();
+          if (!instance.equals(next.future)) {
+            log.warn("Future location is not to this server for the root tablet");
+            return null;
+          }
           TreeMap<KeyExtent,Text> set = new TreeMap<KeyExtent,Text>();
           set.put(extent, new Text(Constants.ZROOT_TABLET));
           return set;

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Fri Jul 13 20:34:44 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
 /**
  * 
  */
 public class LogSorter {
   
+
   private static final Logger log = Logger.getLogger(LogSorter.class);
   FileSystem fs;
   AccumuloConfiguration conf;
   
-  private Map<String,Work> currentWork = new HashMap<String,Work>();
+  private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
 
-  class Work implements Runnable {
-    final String name;
-    FSDataInputStream input;
-    final String destPath;
-    long bytesCopied = -1;
-    long sortStart = 0;
-    long sortStop = -1;
-    private final LogSortNotifier cback;
+  class LogProcessor implements Processor {
     
-    synchronized long getBytesCopied() throws IOException {
-      return input == null ? bytesCopied : input.getPos();
+    private FSDataInputStream input;
+    private long bytesCopied = -1;
+    private long sortStart = 0;
+    private long sortStop = -1;
+    
+    @Override
+    public Processor newProcessor() {
+      return new LogProcessor();
+    }
+    
+    @Override
+    public void process(String child, byte[] data) {
+      String dest = Constants.getRecoveryDir(conf) + "/" + child;
+      String src = new String(data);
+      String name = new Path(src).getName();
+      
+      synchronized (currentWork) {
+        if (currentWork.containsKey(name))
+          return;
+        currentWork.put(name, this);
+      }
+      
+      try {
+        log.info("Copying " + src + " to " + dest);
+        sort(name, new Path(src), dest);
+      } finally {
+        currentWork.remove(name);
+      }
+      
     }
     
-    Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
-      this.name = name;
-      this.input = input;
-      this.destPath = destPath;
-      this.cback = cback;
-    }
-    synchronized boolean finished() {
-      return input == null;
-    }
-    public void run() {
-      sortStart = System.currentTimeMillis();
+    public void sort(String name, Path srcPath, String destPath) {
+
+      synchronized (this) {
+        sortStart = System.currentTimeMillis();
+      }
+
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
+        
+        // the following call does not throw an exception if the file/dir does not exist
+        fs.delete(new Path(destPath), true);
+
+        FSDataInputStream tmpInput = fs.open(srcPath);
+        synchronized (this) {
+          this.input = tmpInput;
+        }
+
         final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");
         while (true) {
-          final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
           try {
             long start = input.getPos();
             while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
               LogFileValue value = new LogFileValue();
               key.readFields(input);
               value.readFields(input);
-              buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+              buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
             }
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             buffer.clear();
           } catch (EOFException ex) {
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             break;
           }
         }
         fs.create(new Path(destPath, "finished")).close();
-        log.debug("Log copy/sort of " + name + " complete");
+        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
+          // parent dir may not exist
+          fs.mkdirs(new Path(destPath));
           fs.create(new Path(destPath, "failed")).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
         log.error(t, t);
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
-        } catch (Exception ex) {
-          log.error("Strange error notifying the master of a logSort problem for file " + name);
-        }
       } finally {
         Thread.currentThread().setName(formerThreadName);
         try {
@@ -133,19 +147,13 @@ public class LogSorter {
         } catch (IOException e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
-        sortStop = System.currentTimeMillis();
-        synchronized (currentWork) {
-          currentWork.remove(name);
-        }
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), "");
-        } catch (Exception ex) {
-          log.error("Strange error reporting successful log sort " + name, ex);
+        synchronized (this) {
+          sortStop = System.currentTimeMillis();
         }
       }
     }
     
-    private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       String path = destPath + String.format("/part-r-%05d", part++);
       MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
       try {
@@ -162,7 +170,7 @@ public class LogSorter {
         output.close();
       }
     }
-    
+
     synchronized void close() throws IOException {
       bytesCopied = input.getPos();
       input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
       }
       return 0;
     }
-  };
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+  }
   
-  final ThreadPoolExecutor threadPool;
+  ThreadPoolExecutor threadPool;
   private Instance instance;
   
   public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
   }
-  
-  public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
-    final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
-    final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    zoo.mkdirs(path);
-    List<String> children = zoo.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        switch (event.getType()) {
-          case NodeChildrenChanged:
-            if (event.getPath().equals(path))
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            else
-              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
-            break;
-          case NodeCreated:
-          case NodeDataChanged:
-          case NodeDeleted:
-          case None:
-            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
-            break;
-          
-        }
-      }
-    });
-    attemptRecoveries(zoo, serverName, path, children);
-    Random r = new Random();
-    // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    SimpleTimer.getInstance().schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-        } catch (KeeperException e) {
-          log.error("Unable to get recovery information", e);
-        } catch (InterruptedException e) {
-          log.info("Interrupted getting recovery information", e);
-        }        
-      }
-    }, r.nextInt(1000), 60 * 1000);
-  }
-  
-  private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
-    if (children.size() == 0)
-      return;
-    
-    if (threadPool.getQueue().size() > 1)
-      return;
 
-    log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
-    Random random = new Random();
-    Collections.shuffle(children, random);
-    try {
-      for (String child : children) {
-        final String childPath = path + "/" + child;
-        log.debug("Attempting to lock " + child);
-        ZooLock lock = new ZooLock(childPath);
-        if (lock.tryLock(new LockWatcher() {
-          @Override
-          public void lostLock(LockLossReason reason) {
-            log.info("Ignoring lost lock event, reason " + reason);
-          }
-        }, serverName.getBytes())) {
-          // Great... we got the lock, but maybe we're too busy
-          if (threadPool.getQueue().size() > 1) {
-            lock.unlock();
-            log.debug("got the lock, but thread pool is busy; released the lock on " + child);
-            break;
-          }
-          log.debug("got lock for " + child);
-          byte[] contents = zoo.getData(childPath, null);
-          String destination = Constants.getRecoveryDir(conf) + "/" + child;
-          startSort(new String(contents), destination, new LogSortNotifier() {
-            @Override
-            public void notice(String name, long bytes, int parts, long milliseconds, String error) {
-              log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
-              try {
-                zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
-              } catch (Exception e) {
-                log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
-              }
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            }
-          });
-        } else {
-          log.debug("failed to get the lock " + child);
-        }
-      }
-    } catch (Throwable t) {
-      log.error("Unexpected error", t);
-    }
-  }
-
-  public interface LogSortNotifier {
-    public void notice(String name, long bytes, int parts, long milliseconds, String error);
-  }
-
-  private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
-    log.info("Copying " + src + " to " + dest);
-    fs.delete(new Path(dest), true);
-    Path srcPath = new Path(src);
-    synchronized (currentWork) {
-      Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
-      if (!currentWork.containsKey(srcPath.getName())) {
-        threadPool.execute(work);
-        currentWork.put(srcPath.getName(), work);
-      }
-    }
+  public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+    this.threadPool = distWorkQThreadPool;
+    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
   }
   
   public List<RecoveryStatus> getLogSorts() {
     List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
     synchronized (currentWork) {
-      for (Entry<String,Work> entries : currentWork.entrySet()) {
+      for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
         RecoveryStatus status = new RecoveryStatus();
         status.name = entries.getKey();
         try {

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java Fri Jul 13 20:34:44 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.table
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.client.impl.Translator;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -48,7 +49,7 @@ public class SplitReportMessage implemen
     TabletSplit split = new TabletSplit();
     split.oldTablet = old_extent.toThrift();
     split.newTablets = Translator.translate(extents.keySet(), Translator.KET);
-    client.reportSplitExtent(null, credentials, serverName, split);
+    client.reportSplitExtent(Tracer.traceInfo(), credentials, serverName, split);
   }
   
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java Fri Jul 13 20:34:44 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.tabletserver.mastermessage;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.TabletLoadState;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
@@ -34,6 +35,6 @@ public class TabletStatusMessage impleme
   }
   
   public void send(AuthInfo auth, String serverName, Iface client) throws TException, ThriftSecurityException {
-    client.reportTabletStatus(null, auth, serverName, status, extent.toThrift());
+    client.reportTabletStatus(Tracer.traceInfo(), auth, serverName, status, extent.toThrift());
   }
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Fri Jul 13 20:34:44 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.test;
 import java.io.IOException;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.master.MasterNotRunningException;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -43,7 +44,7 @@ public class GetMasterStats {
     MasterMonitorInfo stats = null;
     try {
       client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
-      stats = client.getMasterStats(null, SecurityConstants.getSystemCredentials());
+      stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
     } finally {
       if (client != null)
         MasterClient.close(client);

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java Fri Jul 13 20:34:44 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.test;
 
 import java.nio.ByteBuffer;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
@@ -39,7 +40,7 @@ public class WrongTabletTest {
       Mutation mutation = new Mutation(new Text("row_0003750001"));
       // mutation.set(new Text("colf:colq"), new Value("val".getBytes()));
       mutation.putDelete(new Text("colf"), new Text("colq"));
-      client.update(null, rootCredentials, new KeyExtent(new Text("test_ingest"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
+      client.update(Tracer.traceInfo(), rootCredentials, new KeyExtent(new Text("test_ingest"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java Fri Jul 13 20:34:44 2012
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -126,7 +127,7 @@ public class ContinuousStatsCollector {
       MasterClientService.Iface client = null;
       try {
         client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
-        MasterMonitorInfo stats = client.getMasterStats(null, SecurityConstants.getSystemCredentials());
+        MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
         
         TableInfo all = new TableInfo();
         Map<String,TableInfo> tableSummaries = new HashMap<String,TableInfo>();

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java Fri Jul 13 20:34:44 2012
@@ -80,46 +80,46 @@ public class UndefinedAnalyzer {
     
     private void parseLog(File log) throws Exception {
       BufferedReader reader = new BufferedReader(new FileReader(log));
-      
       String line;
       TreeMap<Long,Long> tm = null;
-      
-      while ((line = reader.readLine()) != null) {
-        if (!line.startsWith("UUID"))
-          continue;
-        String[] tokens = line.split("\\s");
-        String time = tokens[1];
-        String uuid = tokens[2];
-        
-        if (flushes.containsKey(uuid)) {
-          System.err.println("WARN Duplicate uuid " + log);
+      try {
+        while ((line = reader.readLine()) != null) {
+          if (!line.startsWith("UUID"))
+            continue;
+          String[] tokens = line.split("\\s");
+          String time = tokens[1];
+          String uuid = tokens[2];
+          
+          if (flushes.containsKey(uuid)) {
+            System.err.println("WARN Duplicate uuid " + log);
+            return;
+          }
+          
+          tm = new TreeMap<Long,Long>(Collections.reverseOrder());
+          tm.put(0l, Long.parseLong(time));
+          flushes.put(uuid, tm);
+          break;
+          
+        }
+        if (tm == null) {
+          System.err.println("WARN Bad ingest log " + log);
           return;
         }
         
-        tm = new TreeMap<Long,Long>(Collections.reverseOrder());
-        tm.put(0l, Long.parseLong(time));
-        flushes.put(uuid, tm);
-        break;
-        
-      }
-      
-      if (tm == null) {
-        System.err.println("WARN Bad ingest log " + log);
-        return;
-      }
-      
-      while ((line = reader.readLine()) != null) {
-        String[] tokens = line.split("\\s");
-        
-        if (!tokens[0].equals("FLUSH"))
-          continue;
-        
-        String time = tokens[1];
-        String count = tokens[4];
-        
-        tm.put(Long.parseLong(count), Long.parseLong(time));
+        while ((line = reader.readLine()) != null) {
+          String[] tokens = line.split("\\s");
+          
+          if (!tokens[0].equals("FLUSH"))
+            continue;
+          
+          String time = tokens[1];
+          String count = tokens[4];
+          
+          tm.put(Long.parseLong(count), Long.parseLong(time));
+        }
+      } finally {
+        reader.close();
       }
-      
     }
     
     Iterator<Long> getTimes(String uuid, long count) {
@@ -172,45 +172,49 @@ public class UndefinedAnalyzer {
         
         BufferedReader reader = new BufferedReader(new FileReader(masterLog));
         String line;
-        while ((line = reader.readLine()) != null) {
-          if (line.contains("TABLET_LOADED")) {
-            String[] tokens = line.split("\\s+");
-            String tablet = tokens[8];
-            String server = tokens[10];
-            
-            int pos1 = -1;
-            int pos2 = -1;
-            int pos3 = -1;
-            
-            for (int i = 0; i < tablet.length(); i++) {
-              if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
-                if (pos1 == -1) {
-                  pos1 = i;
-                } else if (pos2 == -1) {
-                  pos2 = i;
-                } else {
-                  pos3 = i;
+        try {
+          while ((line = reader.readLine()) != null) {
+            if (line.contains("TABLET_LOADED")) {
+              String[] tokens = line.split("\\s+");
+              String tablet = tokens[8];
+              String server = tokens[10];
+              
+              int pos1 = -1;
+              int pos2 = -1;
+              int pos3 = -1;
+              
+              for (int i = 0; i < tablet.length(); i++) {
+                if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+                  if (pos1 == -1) {
+                    pos1 = i;
+                  } else if (pos2 == -1) {
+                    pos2 = i;
+                  } else {
+                    pos3 = i;
+                  }
                 }
               }
-            }
-            
-            if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
-              String tid = tablet.substring(0, pos1);
-              String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
-              String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
-              if (tid.equals(tableId)) {
-                // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
-                Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
-                // System.out.println(" "+date);
-                
-                assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
-                
+              
+              if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+                String tid = tablet.substring(0, pos1);
+                String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
+                String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+                if (tid.equals(tableId)) {
+                  // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
+                  Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
+                  // System.out.println(" "+date);
+                  
+                  assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+                  
+                }
+              } else if (!tablet.startsWith("!0")) {
+                System.err.println("Cannot parse tablet " + tablet);
               }
-            } else if (!tablet.startsWith("!0")) {
-              System.err.println("Cannot parse tablet " + tablet);
+              
             }
-            
           }
+        } finally {
+          reader.close();
         }
       }
     }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java Fri Jul 13 20:34:44 2012
@@ -70,6 +70,7 @@ public class CacheTestReader {
       
       oos.writeObject(readData);
       
+      fos.close();
       oos.close();
       
       UtilWaitThread.sleep(20);

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java Fri Jul 13 20:34:44 2012
@@ -132,6 +132,7 @@ public class CacheTestWriter {
               @SuppressWarnings("unchecked")
               Map<String,String> readerMap = (Map<String,String>) ois.readObject();
               
+              fis.close();
               ois.close();
               
               System.out.println("read " + readerMap);

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java Fri Jul 13 20:34:44 2012
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Random;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -29,6 +30,8 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
@@ -96,7 +99,7 @@ public class ZombieTServer {
     
     TransactionWatcher watcher = new TransactionWatcher();
     final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher);
-    TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+    Processor<Iface> processor = new Processor<Iface>(tch);
     ServerPort serverPort = TServerUtils.startTServer(port, processor, "ZombieTServer", "walking dead", 2, 1000);
     
     InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), serverPort.port);
@@ -111,7 +114,7 @@ public class ZombieTServer {
       @Override
       public void lostLock(final LockLossReason reason) {
         try {
-          tch.halt(null, null, null);
+          tch.halt(Tracer.traceInfo(), null, null);
         } catch (Exception ex) {
           log.error(ex, ex);
           System.exit(1);

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Fri Jul 13 20:34:44 2012
@@ -47,6 +47,8 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -205,7 +207,7 @@ public class NullTserver {
     
     TransactionWatcher watcher = new TransactionWatcher();
     ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
-    TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+    Processor<Iface> processor = new Processor<Iface>(tch);
     TServerUtils.startTServer(port, processor, "NullTServer", "null tserver", 2, 1000);
     
     InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), port);

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java Fri Jul 13 20:34:44 2012
@@ -103,7 +103,9 @@ public class Framework {
     String module = args[3];
     
     Properties props = new Properties();
-    props.load(new FileInputStream(configDir + "/randomwalk.conf"));
+    FileInputStream fis = new FileInputStream(configDir + "/randomwalk.conf");
+    props.load(fis);
+    fis.close();
     
     System.setProperty("localLog", localLogPath + "/" + logId);
     System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + logId);

Copied: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java (from r1359316, accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java?p2=accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java&p1=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java&r1=1359316&r2=1361382&rev=1361382&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java Fri Jul 13 20:34:44 2012
@@ -10,7 +10,8 @@ public class StartAll extends Test {
   @Override
   public void visit(State state, Properties props) throws Exception {
     log.info("Starting all servers");
-    Runtime.getRuntime().exec(new String[]{System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"});
+    Process exec = Runtime.getRuntime().exec(new String[]{System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"});
+    exec.waitFor();
   }
   
 }

Copied: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java (from r1359316, accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java?p2=accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java&p1=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java&r1=1359316&r2=1361382&rev=1361382&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java Fri Jul 13 20:34:44 2012
@@ -2,36 +2,63 @@ package org.apache.accumulo.server.test.
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.test.randomwalk.State;
 import org.apache.accumulo.server.test.randomwalk.Test;
+import org.apache.accumulo.server.util.AddressUtil;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 public class StopTabletServer extends Test {
   
+  Set<TServerInstance> getTServers(Instance instance) throws KeeperException, InterruptedException {
+    Set<TServerInstance> result = new HashSet<TServerInstance>();
+    ZooReader rdr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+    String base = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+    for (String child : rdr.getChildren(base)) {
+      try {
+        List<String> children = rdr.getChildren(base + "/" + child);
+        if (children.size() > 0) {
+          Collections.sort(children);
+          Stat stat = new Stat();
+          byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat);
+          if (!"master".equals(new String(data))) {
+            result.add(new TServerInstance(AddressUtil.parseAddress(child, Property.TSERV_CLIENTPORT), stat.getEphemeralOwner()));
+          }
+        }
+      } catch (KeeperException.NoNodeException ex) {
+        // someone beat us too it
+      }
+    }
+    return result;
+  }
+  
   @Override
   public void visit(State state, Properties props) throws Exception {
     
-    LiveTServerSet set = new LiveTServerSet(state.getInstance(), DefaultConfiguration.getDefaultConfiguration(), new Listener() {
-      @Override
-      public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
-        log.info("Tablet server set changed: " + deleted + " deleted and " + added + " added");
-      }
-    });
-    List<TServerInstance> currentServers = new ArrayList<TServerInstance>(set.getCurrentServers());
+    Instance instance = state.getInstance();
+    
+    List<TServerInstance> currentServers = new ArrayList<TServerInstance>(getTServers(instance));
     Collections.shuffle(currentServers);
     Runtime runtime = Runtime.getRuntime();
     if (currentServers.size() > 1) {
       TServerInstance victim = currentServers.get(0);
       log.info("Stopping " + victim.hostPort());
-      runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()});
-      if (set.getCurrentServers().contains(victim))
+      Process exec = runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()});
+      if (exec.waitFor() != 0)
+        throw new RuntimeException("admin stop returned a non-zero response: " + exec.exitValue());
+      Set<TServerInstance> set = getTServers(instance);
+      if (set.contains(victim))
         throw new RuntimeException("Failed to stop " + victim);
     }
   }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java Fri Jul 13 20:34:44 2012
@@ -31,10 +31,6 @@ import org.apache.accumulo.server.test.r
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
-/**
- * @author jwvines
- * 
- */
 public class SecurityHelper {
   protected final static Logger log = Logger.getLogger(SecurityHelper.class);
   
@@ -127,6 +123,9 @@ public class SecurityHelper {
   public static void setTabPerm(State state, String userName, TablePermission tp, boolean value) {
     log.debug((value ? "Gave" : "Took") + " the table permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
     state.set("Tab" + userName + tp.name(), Boolean.toString(value));
+    if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE))
+      state.set("Tab" + userName + tp.name() + "time", System.currentTimeMillis());
+
   }
   
   public static boolean getSysPerm(State state, String userName, SystemPermission tp) {
@@ -194,4 +193,19 @@ public class SecurityHelper {
     return fs;
   }
   
+  /**
+   * @param state
+   * @param tabUserName
+   * @param tp
+   * @return
+   */
+  public static boolean inAmbiguousZone(State state, String userName, TablePermission tp) {
+    if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE)) {
+      Long setTime = (Long) state.get("Tab" + userName + tp.name() + "time");
+      if (System.currentTimeMillis() < (setTime + 1000))
+        return true;
+    }
+    return false;
+  }
+  
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java Fri Jul 13 20:34:44 2012
@@ -78,11 +78,13 @@ public class TableOp extends Test {
     boolean hasPerm = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), tp);
     
     String tableName = state.getString("secTableName");
+    boolean ambiguousZone;
     
     switch (tp) {
       case READ:
         Authorizations auths = SecurityHelper.getUserAuths(state, SecurityHelper.getTabUserName(state));
         boolean canRead = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), TablePermission.READ);
+        ambiguousZone = SecurityHelper.inAmbiguousZone(state, SecurityHelper.getTabUserName(state), tp);
         try {
           Scanner scan = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(SecurityHelper.getTabUserName(state)));
           int seen = 0;
@@ -94,7 +96,7 @@ public class TableOp extends Test {
             if (!auths.contains(k.getColumnVisibilityData()))
               throw new AccumuloException("Got data I should not be capable of seeing: " + k + " table " + tableName);
           }
-          if (!canRead)
+          if (!canRead && !ambiguousZone)
             throw new AccumuloException("Was able to read when I shouldn't have had the perm with connection user " + conn.whoami() + " table " + tableName);
           for (Entry<String,Integer> entry : SecurityHelper.getAuthsMap(state).entrySet()) {
             if (auths.contains(entry.getKey().getBytes()))
@@ -108,7 +110,7 @@ public class TableOp extends Test {
           return;
         } catch (AccumuloSecurityException ae) {
           if (ae.getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
-            if (canRead)
+            if (canRead && !ambiguousZone)
               throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, ae);
             else
               return;
@@ -117,7 +119,7 @@ public class TableOp extends Test {
         } catch (RuntimeException re) {
           if (re.getCause() instanceof AccumuloSecurityException
               && ((AccumuloSecurityException) re.getCause()).getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
-            if (canRead)
+            if (canRead && !ambiguousZone)
               throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, re.getCause());
             else
               return;
@@ -127,6 +129,9 @@ public class TableOp extends Test {
         
         break;
       case WRITE:
+        // boolean canWrite = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), TablePermission.WRITE);
+        ambiguousZone = SecurityHelper.inAmbiguousZone(state, SecurityHelper.getTabUserName(state), tp);
+
         String key = SecurityHelper.getLastKey(state) + "1";
         Mutation m = new Mutation(new Text(key));
         for (String s : SecurityHelper.getAuthsArray()) {
@@ -143,8 +148,20 @@ public class TableOp extends Test {
         boolean works = true;
         try {
           writer.addMutation(m);
+          writer.close();
         } catch (MutationsRejectedException mre) {
-          throw new AccumuloException("Mutation exception!", mre);
+          // Currently no method for detecting reason for mre. Waiting on ACCUMULO-670
+          // For now, just wait a second and go again!
+
+          if (ambiguousZone) {
+            Thread.sleep(1000);
+            try {
+              writer.addMutation(m);
+              writer.close();
+            } catch (MutationsRejectedException mre2) {
+              throw new AccumuloException("Mutation exception!", mre2);
+            }
+          }
         }
         if (works)
           for (String s : SecurityHelper.getAuthsArray())

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java Fri Jul 13 20:34:44 2012
@@ -52,8 +52,11 @@ public class Run {
     Properties scaleProps = new Properties();
     Properties testProps = new Properties();
     try {
-      scaleProps.load(new FileInputStream(sitePath));
-      testProps.load(new FileInputStream(testPath));
+      FileInputStream fis = new FileInputStream(sitePath);
+      scaleProps.load(fis);
+      fis.close();
+      fis = new FileInputStream(testPath);
+      testProps.load(fis);
     } catch (Exception e) {
       System.out.println("Problem loading config file");
       e.printStackTrace();

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Fri Jul 13 20:34:44 2012
@@ -23,7 +23,8 @@ import java.util.TimerTask;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.thrift.RemoteSpan;
-import org.apache.accumulo.cloudtrace.thrift.SpanReceiver;
+import org.apache.accumulo.cloudtrace.thrift.SpanReceiver.Processor;
+import org.apache.accumulo.cloudtrace.thrift.SpanReceiver.Iface;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
@@ -110,7 +111,7 @@ public class TraceServer implements Watc
     }
   }
   
-  class Receiver implements SpanReceiver.Iface {
+  class Receiver implements Iface {
     @Override
     public void span(RemoteSpan s) throws TException {
       String idString = Long.toHexString(s.traceId);
@@ -172,7 +173,7 @@ public class TraceServer implements Watc
     sock.bind(new InetSocketAddress(port));
     final TServerTransport transport = new TServerSocket(sock);
     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
-    options.processor(new SpanReceiver.Processor(new Receiver()));
+    options.processor(new Processor<Iface>(new Receiver()));
     server = new TThreadPoolServer(options);
     final InetSocketAddress address = new InetSocketAddress(hostname, sock.getLocalPort());
     registerInZooKeeper(AddressUtil.toString(address));

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java Fri Jul 13 20:34:44 2012
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 import jline.ConsoleReader;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.impl.ClientExec;
@@ -119,10 +120,10 @@ public class Admin {
   }
   
   private static void stopServer(final AuthInfo credentials, final boolean tabletServersToo) throws AccumuloException, AccumuloSecurityException {
-    MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Iface>() {
+    MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>() {
       @Override
-      public void execute(MasterClientService.Iface client) throws Exception {
-        client.shutdown(null, credentials, tabletServersToo);
+      public void execute(MasterClientService.Client client) throws Exception {
+        client.shutdown(Tracer.traceInfo(), credentials, tabletServersToo);
       }
     });
   }
@@ -130,10 +131,10 @@ public class Admin {
   private static void stopTabletServer(String server, final boolean force) throws AccumuloException, AccumuloSecurityException {
     InetSocketAddress address = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
     final String finalServer = org.apache.accumulo.core.util.AddressUtil.toString(address);
-    MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Iface>() {
+    MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>() {
       @Override
-      public void execute(MasterClientService.Iface client) throws Exception {
-        client.shutdownTabletServer(null, SecurityConstants.getSystemCredentials(), finalServer, force);
+      public void execute(MasterClientService.Client client) throws Exception {
+        client.shutdownTabletServer(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), finalServer, force);
       }
     });
   }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java Fri Jul 13 20:34:44 2012
@@ -60,11 +60,12 @@ public class FileSystemMonitor {
     BufferedReader br = new BufferedReader(fr);
     
     String line;
-    
+    try {
     while ((line = br.readLine()) != null)
       mounts.add(new Mount(line));
-    
-    br.close();
+    } finally {
+      br.close();
+    }
     
     return mounts;
   }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Fri Jul 13 20:34:44 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.Random;
@@ -43,10 +42,10 @@ import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -148,7 +147,12 @@ public class TServerUtils {
         metrics.add(ThriftMetrics.idle, (now - idleStart));
       }
       try {
-        return other.process(in, out);
+        try {
+          return other.process(in, out);
+        } catch (NullPointerException ex) {
+          // THRIFT-1447 - remove with thrift 0.9
+          return true;
+        }
       } finally {
         if (metrics.isEnabled()) {
           idleStart = System.currentTimeMillis();
@@ -173,34 +177,6 @@ public class TServerUtils {
     }
   }
   
-  public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
-    public THsHaServer(Args args) {
-      super(args);
-    }
-    
-    protected Runnable getRunnable(FrameBuffer frameBuffer) {
-      return new Invocation(frameBuffer);
-    }
-    
-    private class Invocation implements Runnable {
-      
-      private final FrameBuffer frameBuffer;
-      
-      public Invocation(final FrameBuffer frameBuffer) {
-        this.frameBuffer = frameBuffer;
-      }
-      
-      public void run() {
-        if (frameBuffer.trans_ instanceof TNonblockingSocket) {
-          TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
-          Socket sock = tsock.getSocketChannel().socket();
-          clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
-        }
-        frameBuffer.invoke();
-      }
-    }
-  }
-  
   public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks) throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(port);