You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by af...@apache.org on 2012/07/05 23:03:21 UTC

svn commit: r1357909 [2/3] - in /accumulo/branches/ACCUMULO-652: ./ bin/ core/ core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java...

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java Thu Jul  5 21:03:16 2012
@@ -20,10 +20,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.Logger;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java Thu Jul  5 21:03:16 2012
@@ -28,14 +28,14 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Thu Jul  5 21:03:16 2012
@@ -16,13 +16,19 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -33,6 +39,8 @@ import org.apache.accumulo.cloudtrace.in
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
@@ -42,30 +50,34 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -133,9 +145,8 @@ public class BulkImport extends MasterRe
     Utils.getReadLock(tableId, tid).lock();
     
     // check that the error directory exists and is empty
-    FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-        ServerConfiguration.getSiteConfiguration()));
-    ;
+    FileSystem fs = master.getFileSystem();
+
     Path errorPath = new Path(errorDir);
     FileStatus errorStatus = fs.getFileStatus(errorPath);
     if (errorStatus == null)
@@ -273,24 +284,6 @@ class CleanUpBulkImport extends MasterRe
   }
   
   @Override
-  public long isReady(long tid, Master master) throws Exception {
-    Set<TServerInstance> finished = new HashSet<TServerInstance>();
-    Set<TServerInstance> running = master.onlineTabletServers();
-    for (TServerInstance server : running) {
-      try {
-        TServerConnection client = master.getConnection(server);
-        if (client != null && !client.isActive(tid))
-          finished.add(server);
-      } catch (TException ex) {
-        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
-      }
-    }
-    if (finished.containsAll(running))
-      return 0;
-    return 1000;
-  }
-  
-  @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.debug("removing the bulk processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
@@ -327,8 +320,124 @@ class CompleteBulkImport extends MasterR
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return new CopyFailed(tableId, source, bulk, error);
+  }
+}
+
+class CopyFailed extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CopyFailed(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    Set<TServerInstance> finished = new HashSet<TServerInstance>();
+    Set<TServerInstance> running = master.onlineTabletServers();
+    for (TServerInstance server : running) {
+      try {
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
+          finished.add(server);
+      } catch (TException ex) {
+        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
+      }
+    }
+    if (finished.containsAll(running))
+      return 0;
+    return 500;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+	//This needs to execute after the arbiter is stopped  
+	  
+    FileSystem fs = environment.getFileSystem();
+	  
+    if (!fs.exists(new Path(error, "failures.txt")))
+      return new CleanUpBulkImport(tableId, source, bulk, error);
+    
+    HashMap<String,String> failures = new HashMap<String,String>();
+    HashMap<String,String> loadedFailures = new HashMap<String,String>();
+    
+    FSDataInputStream failFile = fs.open(new Path(error, "failures.txt"));
+    BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        Path path = new Path(line);
+        if (!fs.exists(new Path(error, path.getName())))
+          failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
+      }
+    } finally {
+      failFile.close();
+    }
+    
+    /*
+     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
+     * have no loaded markers.
+     */
+
+    // determine which failed files were loaded
+    AuthInfo creds = SecurityConstants.getSystemCredentials();
+    Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password);
+    Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
+    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+    mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+    
+    for (Entry<Key,Value> entry : mscanner) {
+      if (Long.parseLong(entry.getValue().toString()) == tid) {
+        String loadedFile = entry.getKey().getColumnQualifierData().toString();
+        String absPath = failures.remove(loadedFile);
+        if (absPath != null) {
+          loadedFailures.put(loadedFile, absPath);
+        }
+      }
+    }
+    
+    // move failed files that were not loaded
+    for (String failure : failures.values()) {
+      Path orig = new Path(failure);
+      Path dest = new Path(error, orig.getName());
+      fs.rename(orig, dest);
+      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": failed");
+    }
+    
+    if (loadedFailures.size() > 0) {
+      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+          + Constants.ZBULK_FAILED_COPYQ);
+      
+      HashSet<String> workIds = new HashSet<String>();
+      
+      for (String failure : loadedFailures.values()) {
+        Path orig = new Path(failure);
+        Path dest = new Path(error, orig.getName());
+        
+        if (fs.exists(dest))
+          continue;
+        
+        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
+        workIds.add(orig.getName());
+        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+      }
+      
+      bifCopyQueue.waitUntilDone(workIds);
+    }
+
+    fs.delete(new Path(error, "failures.txt"), true);
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
+  
 }
 
 class LoadFiles extends MasterRepo {
@@ -375,8 +484,7 @@ class LoadFiles extends MasterRepo {
   public Repo<Master> call(final long tid, final Master master) throws Exception {
     initializeThreadPool(master);
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
-    FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-        ServerConfiguration.getSiteConfiguration()));
+    FileSystem fs = master.getFileSystem();
     List<FileStatus> files = new ArrayList<FileStatus>();
     for (FileStatus entry : fs.listStatus(new Path(bulk))) {
       files.add(entry);
@@ -448,23 +556,18 @@ class LoadFiles extends MasterRepo {
         UtilWaitThread.sleep(100);
       }
     }
-    // Copy/Create failed file markers
-    for (String f : filesToLoad) {
-      Path orig = new Path(f);
-      Path dest = new Path(errorDir, orig.getName());
-      try {
-        FileUtil.copy(fs, orig, fs, dest, false, true, CachedConfiguration.getInstance());
-        log.debug("tid " + tid + " copied " + orig + " to " + dest + ": failed");
-      } catch (IOException ex) {
-        try {
-          fs.create(dest).close();
-          log.debug("tid " + tid + " marked " + dest + " failed");
-        } catch (IOException e) {
-          log.error("Unable to create failure flag file " + dest, e);
-        }
+    
+    FSDataOutputStream failFile = fs.create(new Path(errorDir, "failures.txt"), true);
+    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+    try {
+      for (String f : filesToLoad) {
+        out.write(f);
+        out.write("\n");
       }
+    } finally {
+      out.close();
     }
-    
+
     // return the next step, which will perform cleanup
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java Thu Jul  5 21:03:16 2012
@@ -18,7 +18,7 @@ package org.apache.accumulo.server.maste
 
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.log4j.Logger;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java Thu Jul  5 21:03:16 2012
@@ -26,9 +26,9 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SecurityConstants;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Thu Jul  5 21:03:16 2012
@@ -41,16 +41,16 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MapCounter;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Thu Jul  5 21:03:16 2012
@@ -31,11 +31,11 @@ import org.apache.accumulo.core.file.Fil
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.Authenticator;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Thu Jul  5 21:03:16 2012
@@ -36,9 +36,9 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 import org.apache.accumulo.server.master.state.TabletLocationState;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java Thu Jul  5 21:03:16 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.log4j.Logger;
 

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java Thu Jul  5 21:03:16 2012
@@ -23,12 +23,12 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.log4j.Logger;
 
 public class RenameTable extends MasterRepo {

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Thu Jul  5 21:03:16 2012
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.master.state.MergeInfo.Operation;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java Thu Jul  5 21:03:16 2012
@@ -20,7 +20,7 @@ import org.apache.accumulo.cloudtrace.in
 import org.apache.accumulo.cloudtrace.instrument.Trace;
 import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
 
 
 /**

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java Thu Jul  5 21:03:16 2012
@@ -27,13 +27,13 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooReservation;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.DistributedReadWriteLock;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooQueueLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReservation;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter.Mutator;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java Thu Jul  5 21:03:16 2012
@@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.master.EventCoordinator.Listener;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.tableOps.MasterRepo;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.Logger;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Thu Jul  5 21:03:16 2012
@@ -44,9 +44,9 @@ abstract public class BasicServlet exten
   private static final long serialVersionUID = 1L;
   protected static final Logger log = Logger.getLogger(BasicServlet.class);
   static String cachedInstanceName = null;
-  private String bannerText;
-  private String bannerColor;
-  private String bannerBackground;
+  private static String bannerText;
+  private static String bannerColor;
+  private static String bannerBackground;
   
   abstract protected String getTitle(HttpServletRequest req);
   

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/DefaultServlet.java Thu Jul  5 21:03:16 2012
@@ -83,13 +83,17 @@ public class DefaultServlet extends Basi
       path = path.substring(1);
       InputStream data = BasicServlet.class.getClassLoader().getResourceAsStream(path);
       ServletOutputStream out = resp.getOutputStream();
-      if (data != null) {
-        byte[] buffer = new byte[1024];
-        int n;
-        while ((n = data.read(buffer)) > 0)
-          out.write(buffer, 0, n);
-      } else {
-        out.write(("could not get resource " + path + "").getBytes());
+      try {
+    	  if (data != null) {
+    		  byte[] buffer = new byte[1024];
+    		  int n;
+    		  while ((n = data.read(buffer)) > 0)
+    			  out.write(buffer, 0, n);
+    	  } else {
+    		  out.write(("could not get resource " + path + "").getBytes());
+    	  }
+      } finally {
+    	  data.close();
       }
     } catch (Throwable t) {
       log.error(t, t);
@@ -113,9 +117,10 @@ public class DefaultServlet extends Basi
       
       @Override
       public IOException run() {
+    	InputStream data = null;
         try {
           File file = new File(aHome + path);
-          InputStream data = new FileInputStream(file.getAbsolutePath());
+          data = new FileInputStream(file.getAbsolutePath());
           byte[] buffer = new byte[1024];
           int n;
           ServletOutputStream out = resp.getOutputStream();
@@ -124,6 +129,14 @@ public class DefaultServlet extends Basi
           return null;
         } catch (IOException e) {
           return e;
+        } finally {
+          if (data != null) {
+            try {
+              data.close();
+            } catch (IOException ex) {
+              log.error(ex, ex);
+            }
+          } 
         }
       }
     }, acc);

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/monitor/servlets/VisServlet.java Thu Jul  5 21:03:16 2012
@@ -32,11 +32,6 @@ public class VisServlet extends BasicSer
   private static final int concurrentScans = Monitor.getSystemConfiguration().getCount(Property.TSERV_READ_AHEAD_MAXCONCURRENT);
   
   private static final long serialVersionUID = 1L;
-  boolean useCircles;
-  StatType motion;
-  StatType color;
-  int spacing;
-  String url;
   
   public enum StatType {
     osload(ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(), true, 100, "OS Load"),
@@ -106,6 +101,14 @@ public class VisServlet extends BasicSer
       return count;
     }
   }
+
+  public static class VisualizationConfig {
+	  boolean useCircles = true;
+	  StatType motion  = StatType.allmax;
+	  StatType color = StatType.allavg;
+	  int spacing = 40;
+	  String url;
+  }
   
   @Override
   protected String getTitle(HttpServletRequest req) {
@@ -116,39 +119,36 @@ public class VisServlet extends BasicSer
   protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws IOException {
     StringBuffer urlsb = req.getRequestURL();
     urlsb.setLength(urlsb.lastIndexOf("/") + 1);
-    url = urlsb.toString();
+    String url = urlsb.toString();
+    VisualizationConfig cfg = new VisualizationConfig();
     
-    useCircles = true;
     String s = req.getParameter("shape");
     if (s != null && (s.equals("square") || s.equals("squares"))) {
-      useCircles = false;
+      cfg.useCircles = false;
     }
     
     s = req.getParameter("motion");
-    motion = StatType.allmax;
     if (s != null) {
       try {
-        motion = StatType.valueOf(s);
+        cfg.motion = StatType.valueOf(s);
       } catch (Exception e) {}
     }
     
     s = req.getParameter("color");
-    color = StatType.allavg;
     if (s != null) {
       try {
-        color = StatType.valueOf(s);
+        cfg.color = StatType.valueOf(s);
       } catch (Exception e) {}
     }
     
-    spacing = 40;
     String size = req.getParameter("size");
     if (size != null) {
       if (size.equals("10"))
-        spacing = 10;
+        cfg.spacing = 10;
       else if (size.equals("20"))
-        spacing = 20;
+        cfg.spacing = 20;
       else if (size.equals("80"))
-        spacing = 80;
+        cfg.spacing = 80;
     }
     
     ArrayList<TabletServerStatus> tservers = new ArrayList<TabletServerStatus>();
@@ -158,30 +158,30 @@ public class VisServlet extends BasicSer
     if (tservers.size() == 0)
       return;
     
-    int width = (int) Math.ceil(Math.sqrt(tservers.size())) * spacing;
-    int height = (int) Math.ceil(tservers.size() / width) * spacing;
-    doSettings(sb, width < 640 ? 640 : width, height < 640 ? 640 : height);
-    doScript(sb, tservers);
+    int width = (int) Math.ceil(Math.sqrt(tservers.size())) * cfg.spacing;
+    int height = (int) Math.ceil(tservers.size() / width) * cfg.spacing;
+    doSettings(sb, cfg, width < 640 ? 640 : width, height < 640 ? 640 : height);
+    doScript(sb, cfg, tservers);
   }
   
-  private void doSettings(StringBuilder sb, int width, int height) {
+  private void doSettings(StringBuilder sb, VisualizationConfig cfg, int width, int height) {
     sb.append("<div class='left'>\n");
     sb.append("<div id='parameters' class='nowrap'>\n");
     // shape select box
     sb.append("<span class='viscontrol'>Shape: <select id='shape' onchange='setShape(this)'><option>Circles</option><option")
-        .append(!useCircles ? " selected='true'" : "").append(">Squares</option></select></span>\n");
+        .append(!cfg.useCircles ? " selected='true'" : "").append(">Squares</option></select></span>\n");
     // size select box
-    sb.append("&nbsp;&nbsp<span class='viscontrol'>Size: <select id='size' onchange='setSize(this)'><option").append(spacing == 10 ? " selected='true'" : "")
-        .append(">10</option><option").append(spacing == 20 ? " selected='true'" : "").append(">20</option><option")
-        .append(spacing == 40 ? " selected='true'" : "").append(">40</option><option").append(spacing == 80 ? " selected='true'" : "")
+    sb.append("&nbsp;&nbsp<span class='viscontrol'>Size: <select id='size' onchange='setSize(this)'><option").append(cfg.spacing == 10 ? " selected='true'" : "")
+        .append(">10</option><option").append(cfg.spacing == 20 ? " selected='true'" : "").append(">20</option><option")
+        .append(cfg.spacing == 40 ? " selected='true'" : "").append(">40</option><option").append(cfg.spacing == 80 ? " selected='true'" : "")
         .append(">80</option></select></span>\n");
     // motion select box
     sb.append("&nbsp;&nbsp<span class='viscontrol'>Motion: <select id='motion' onchange='setMotion(this)'>");
-    addOptions(sb, motion);
+    addOptions(sb, cfg.motion);
     sb.append("</select></span>\n");
     // color select box
     sb.append("&nbsp;&nbsp<span class='viscontrol'>Color: <select id='color' onchange='setColor(this)'>");
-    addOptions(sb, color);
+    addOptions(sb, cfg.color);
     sb.append("</select></span>\n");
     sb.append("&nbsp;&nbsp<span class='viscontrol'>(hover for info, click for details)</span>");
     sb.append("</div>\n\n");
@@ -200,13 +200,13 @@ public class VisServlet extends BasicSer
     }
   }
   
-  private void doScript(StringBuilder sb, ArrayList<TabletServerStatus> tservers) {
+  private void doScript(StringBuilder sb, VisualizationConfig cfg, ArrayList<TabletServerStatus> tservers) {
     // initialization of some javascript variables
     sb.append("<script type='text/javascript'>\n");
     sb.append("var numCores = " + ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors() + ";\n");
-    sb.append("var jsonurl = '" + url + "json';\n");
-    sb.append("var visurl = '" + url + "vis';\n");
-    sb.append("var serverurl = '" + url + "tservers?s=';\n\n");
+    sb.append("var jsonurl = '" + cfg.url + "json';\n");
+    sb.append("var visurl = '" + cfg.url + "vis';\n");
+    sb.append("var serverurl = '" + cfg.url + "tservers?s=';\n\n");
     sb.append("// observable stats that can be connected to motion or color\n");
     sb.append("var statNames = {");
     for (StatType st : StatType.values())

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java Thu Jul  5 21:03:16 2012
@@ -31,8 +31,8 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.Encoding;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Thu Jul  5 21:03:16 2012
@@ -42,10 +42,10 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.io.Text;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java Thu Jul  5 21:03:16 2012
@@ -42,10 +42,10 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.Logger;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu Jul  5 21:03:16 2012
@@ -92,6 +92,7 @@ import org.apache.accumulo.core.util.Loc
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -120,7 +121,6 @@ import org.apache.accumulo.server.util.M
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.MetadataTable.LogEntry;
 import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
@@ -1165,8 +1165,7 @@ public class Tablet {
   }
   
   private static SortedMap<String,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent,
-      SortedMap<Key,Value> tabletsKeyValues)
-      throws IOException {
+      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
     Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString());
     
     TreeMap<String,DataFileValue> datafiles = new TreeMap<String,DataFileValue>();
@@ -1290,8 +1289,7 @@ public class Tablet {
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs,
       SortedMap<Key,Value> tabletsKeyValues) throws IOException {
     this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(),
-        location, fs, extent, tabletsKeyValues),
- lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
+        location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
         tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
   }
   

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu Jul  5 21:03:16 2012
@@ -53,6 +53,7 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,11 +127,15 @@ import org.apache.accumulo.core.util.Dae
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -183,12 +188,10 @@ import org.apache.accumulo.server.util.T
 import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.Platform;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -2569,6 +2572,8 @@ public class TabletServer extends Abstra
   
   private TServer server;
   
+  private DistributedWorkQueue bulkFailedCopyQ;
+  
   private static final String METRICS_PREFIX = "tserver";
   
   private static ObjectName OBJECT_NAME = null;
@@ -2708,13 +2713,23 @@ public class TabletServer extends Abstra
     }
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
+    
+    ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+
+    bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
+    try {
+      bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
+    } catch (Exception e1) {
+      throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
+    }
+    
     try {
-      logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+      logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
     } catch (Exception ex) {
       log.error("Error setting watches for recoveries");
       throw new RuntimeException(ex);
     }
-    
+
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
       // Do this because interface not in same package.

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu Jul  5 21:03:16 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
 /**
  * 
  */
 public class LogSorter {
   
+
   private static final Logger log = Logger.getLogger(LogSorter.class);
   FileSystem fs;
   AccumuloConfiguration conf;
   
-  private Map<String,Work> currentWork = new HashMap<String,Work>();
+  private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
 
-  class Work implements Runnable {
-    final String name;
-    FSDataInputStream input;
-    final String destPath;
-    long bytesCopied = -1;
-    long sortStart = 0;
-    long sortStop = -1;
-    private final LogSortNotifier cback;
+  class LogProcessor implements Processor {
     
-    synchronized long getBytesCopied() throws IOException {
-      return input == null ? bytesCopied : input.getPos();
+    private FSDataInputStream input;
+    private long bytesCopied = -1;
+    private long sortStart = 0;
+    private long sortStop = -1;
+    
+    @Override
+    public Processor newProcessor() {
+      return new LogProcessor();
+    }
+    
+    @Override
+    public void process(String child, byte[] data) {
+      String dest = Constants.getRecoveryDir(conf) + "/" + child;
+      String src = new String(data);
+      String name = new Path(src).getName();
+      
+      synchronized (currentWork) {
+        if (currentWork.containsKey(name))
+          return;
+        currentWork.put(name, this);
+      }
+      
+      try {
+        log.info("Copying " + src + " to " + dest);
+        sort(name, new Path(src), dest);
+      } finally {
+        currentWork.remove(name);
+      }
+      
     }
     
-    Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
-      this.name = name;
-      this.input = input;
-      this.destPath = destPath;
-      this.cback = cback;
-    }
-    synchronized boolean finished() {
-      return input == null;
-    }
-    public void run() {
-      sortStart = System.currentTimeMillis();
+    public void sort(String name, Path srcPath, String destPath) {
+
+      synchronized (this) {
+        sortStart = System.currentTimeMillis();
+      }
+
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
+        
+        // the following call does not throw an exception if the file/dir does not exist
+        fs.delete(new Path(destPath), true);
+
+        FSDataInputStream tmpInput = fs.open(srcPath);
+        synchronized (this) {
+          this.input = tmpInput;
+        }
+
         final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");
         while (true) {
-          final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
           try {
             long start = input.getPos();
             while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
               LogFileValue value = new LogFileValue();
               key.readFields(input);
               value.readFields(input);
-              buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+              buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
             }
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             buffer.clear();
           } catch (EOFException ex) {
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             break;
           }
         }
         fs.create(new Path(destPath, "finished")).close();
-        log.debug("Log copy/sort of " + name + " complete");
+        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
+          // parent dir may not exist
+          fs.mkdirs(new Path(destPath));
           fs.create(new Path(destPath, "failed")).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
         log.error(t, t);
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
-        } catch (Exception ex) {
-          log.error("Strange error notifying the master of a logSort problem for file " + name);
-        }
       } finally {
         Thread.currentThread().setName(formerThreadName);
         try {
@@ -133,19 +147,13 @@ public class LogSorter {
         } catch (IOException e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
-        sortStop = System.currentTimeMillis();
-        synchronized (currentWork) {
-          currentWork.remove(name);
-        }
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), "");
-        } catch (Exception ex) {
-          log.error("Strange error reporting successful log sort " + name, ex);
+        synchronized (this) {
+          sortStop = System.currentTimeMillis();
         }
       }
     }
     
-    private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       String path = destPath + String.format("/part-r-%05d", part++);
       MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
       try {
@@ -162,7 +170,7 @@ public class LogSorter {
         output.close();
       }
     }
-    
+
     synchronized void close() throws IOException {
       bytesCopied = input.getPos();
       input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
       }
       return 0;
     }
-  };
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+  }
   
-  final ThreadPoolExecutor threadPool;
+  ThreadPoolExecutor threadPool;
   private Instance instance;
   
   public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
   }
-  
-  public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
-    final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
-    final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    zoo.mkdirs(path);
-    List<String> children = zoo.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        switch (event.getType()) {
-          case NodeChildrenChanged:
-            if (event.getPath().equals(path))
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            else
-              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
-            break;
-          case NodeCreated:
-          case NodeDataChanged:
-          case NodeDeleted:
-          case None:
-            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
-            break;
-          
-        }
-      }
-    });
-    attemptRecoveries(zoo, serverName, path, children);
-    Random r = new Random();
-    // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    SimpleTimer.getInstance().schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-        } catch (KeeperException e) {
-          log.error("Unable to get recovery information", e);
-        } catch (InterruptedException e) {
-          log.info("Interrupted getting recovery information", e);
-        }        
-      }
-    }, r.nextInt(1000), 60 * 1000);
-  }
-  
-  private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
-    if (children.size() == 0)
-      return;
-    
-    if (threadPool.getQueue().size() > 1)
-      return;
 
-    log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
-    Random random = new Random();
-    Collections.shuffle(children, random);
-    try {
-      for (String child : children) {
-        final String childPath = path + "/" + child;
-        log.debug("Attempting to lock " + child);
-        ZooLock lock = new ZooLock(childPath);
-        if (lock.tryLock(new LockWatcher() {
-          @Override
-          public void lostLock(LockLossReason reason) {
-            log.info("Ignoring lost lock event, reason " + reason);
-          }
-        }, serverName.getBytes())) {
-          // Great... we got the lock, but maybe we're too busy
-          if (threadPool.getQueue().size() > 1) {
-            lock.unlock();
-            log.debug("got the lock, but thread pool is busy; released the lock on " + child);
-            break;
-          }
-          log.debug("got lock for " + child);
-          byte[] contents = zoo.getData(childPath, null);
-          String destination = Constants.getRecoveryDir(conf) + "/" + child;
-          startSort(new String(contents), destination, new LogSortNotifier() {
-            @Override
-            public void notice(String name, long bytes, int parts, long milliseconds, String error) {
-              log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
-              try {
-                zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
-              } catch (Exception e) {
-                log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
-              }
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            }
-          });
-        } else {
-          log.debug("failed to get the lock " + child);
-        }
-      }
-    } catch (Throwable t) {
-      log.error("Unexpected error", t);
-    }
-  }
-
-  public interface LogSortNotifier {
-    public void notice(String name, long bytes, int parts, long milliseconds, String error);
-  }
-
-  private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
-    log.info("Copying " + src + " to " + dest);
-    fs.delete(new Path(dest), true);
-    Path srcPath = new Path(src);
-    synchronized (currentWork) {
-      Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
-      if (!currentWork.containsKey(srcPath.getName())) {
-        threadPool.execute(work);
-        currentWork.put(srcPath.getName(), work);
-      }
-    }
+  public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+    this.threadPool = distWorkQThreadPool;
+    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
   }
   
   public List<RecoveryStatus> getLogSorts() {
     List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
     synchronized (currentWork) {
-      for (Entry<String,Work> entries : currentWork.entrySet()) {
+      for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
         RecoveryStatus status = new RecoveryStatus();
         status.name = entries.getKey();
         try {

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java Thu Jul  5 21:03:16 2012
@@ -45,7 +45,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.Authenticator;
 import org.apache.accumulo.server.security.ZKAuthenticator;
@@ -258,7 +258,7 @@ public class TestIngest {
     try {
       if (ingestArgs.trace) {
         String name = TestIngest.class.getSimpleName();
-        DistributedTrace.enable(instance, new ZooReader(instance), name, null);
+        DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null);
         Trace.on(name);
         Trace.currentTrace().data("cmdLine", Arrays.asList(args).toString());
       }

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java Thu Jul  5 21:03:16 2012
@@ -33,7 +33,7 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.test.TestIngest.IngestArgs;
 import org.apache.hadoop.io.Text;
@@ -60,7 +60,7 @@ public class VerifyIngest {
     try {
       if (ingestArgs.trace) {
         String name = VerifyIngest.class.getSimpleName();
-        DistributedTrace.enable(instance, new ZooReader(instance), name, null);
+        DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null);
         Trace.on(name);
         Trace.currentTrace().data("cmdLine", Arrays.asList(args).toString());
       }

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java Thu Jul  5 21:03:16 2012
@@ -80,46 +80,46 @@ public class UndefinedAnalyzer {
     
     private void parseLog(File log) throws Exception {
       BufferedReader reader = new BufferedReader(new FileReader(log));
-      
       String line;
       TreeMap<Long,Long> tm = null;
-      
-      while ((line = reader.readLine()) != null) {
-        if (!line.startsWith("UUID"))
-          continue;
-        String[] tokens = line.split("\\s");
-        String time = tokens[1];
-        String uuid = tokens[2];
-        
-        if (flushes.containsKey(uuid)) {
-          System.err.println("WARN Duplicate uuid " + log);
+      try {
+        while ((line = reader.readLine()) != null) {
+          if (!line.startsWith("UUID"))
+            continue;
+          String[] tokens = line.split("\\s");
+          String time = tokens[1];
+          String uuid = tokens[2];
+          
+          if (flushes.containsKey(uuid)) {
+            System.err.println("WARN Duplicate uuid " + log);
+            return;
+          }
+          
+          tm = new TreeMap<Long,Long>(Collections.reverseOrder());
+          tm.put(0l, Long.parseLong(time));
+          flushes.put(uuid, tm);
+          break;
+          
+        }
+        if (tm == null) {
+          System.err.println("WARN Bad ingest log " + log);
           return;
         }
         
-        tm = new TreeMap<Long,Long>(Collections.reverseOrder());
-        tm.put(0l, Long.parseLong(time));
-        flushes.put(uuid, tm);
-        break;
-        
-      }
-      
-      if (tm == null) {
-        System.err.println("WARN Bad ingest log " + log);
-        return;
-      }
-      
-      while ((line = reader.readLine()) != null) {
-        String[] tokens = line.split("\\s");
-        
-        if (!tokens[0].equals("FLUSH"))
-          continue;
-        
-        String time = tokens[1];
-        String count = tokens[4];
-        
-        tm.put(Long.parseLong(count), Long.parseLong(time));
+        while ((line = reader.readLine()) != null) {
+          String[] tokens = line.split("\\s");
+          
+          if (!tokens[0].equals("FLUSH"))
+            continue;
+          
+          String time = tokens[1];
+          String count = tokens[4];
+          
+          tm.put(Long.parseLong(count), Long.parseLong(time));
+        }
+      } finally {
+        reader.close();
       }
-      
     }
     
     Iterator<Long> getTimes(String uuid, long count) {
@@ -172,45 +172,49 @@ public class UndefinedAnalyzer {
         
         BufferedReader reader = new BufferedReader(new FileReader(masterLog));
         String line;
-        while ((line = reader.readLine()) != null) {
-          if (line.contains("TABLET_LOADED")) {
-            String[] tokens = line.split("\\s+");
-            String tablet = tokens[8];
-            String server = tokens[10];
-            
-            int pos1 = -1;
-            int pos2 = -1;
-            int pos3 = -1;
-            
-            for (int i = 0; i < tablet.length(); i++) {
-              if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
-                if (pos1 == -1) {
-                  pos1 = i;
-                } else if (pos2 == -1) {
-                  pos2 = i;
-                } else {
-                  pos3 = i;
+        try {
+          while ((line = reader.readLine()) != null) {
+            if (line.contains("TABLET_LOADED")) {
+              String[] tokens = line.split("\\s+");
+              String tablet = tokens[8];
+              String server = tokens[10];
+              
+              int pos1 = -1;
+              int pos2 = -1;
+              int pos3 = -1;
+              
+              for (int i = 0; i < tablet.length(); i++) {
+                if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+                  if (pos1 == -1) {
+                    pos1 = i;
+                  } else if (pos2 == -1) {
+                    pos2 = i;
+                  } else {
+                    pos3 = i;
+                  }
                 }
               }
-            }
-            
-            if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
-              String tid = tablet.substring(0, pos1);
-              String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
-              String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
-              if (tid.equals(tableId)) {
-                // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
-                Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
-                // System.out.println(" "+date);
-                
-                assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
-                
+              
+              if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+                String tid = tablet.substring(0, pos1);
+                String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
+                String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+                if (tid.equals(tableId)) {
+                  // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
+                  Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
+                  // System.out.println(" "+date);
+                  
+                  assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+                  
+                }
+              } else if (!tablet.startsWith("!0")) {
+                System.err.println("Cannot parse tablet " + tablet);
               }
-            } else if (!tablet.startsWith("!0")) {
-              System.err.println("Cannot parse tablet " + tablet);
+              
             }
-            
           }
+        } finally {
+          reader.close();
         }
       }
     }

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java Thu Jul  5 21:03:16 2012
@@ -18,8 +18,8 @@ package org.apache.accumulo.server.test.
 
 import java.io.File;
 
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 
 public class CacheTestClean {

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java Thu Jul  5 21:03:16 2012
@@ -25,7 +25,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 
 public class CacheTestReader {
   public static void main(String[] args) throws Exception {
@@ -70,6 +70,7 @@ public class CacheTestReader {
       
       oos.writeObject(readData);
       
+      fos.close();
       oos.close();
       
       UtilWaitThread.sleep(20);

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java Thu Jul  5 21:03:16 2012
@@ -27,9 +27,9 @@ import java.util.TreeMap;
 import java.util.UUID;
 
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 
 public class CacheTestWriter {
@@ -132,6 +132,7 @@ public class CacheTestWriter {
               @SuppressWarnings("unchecked")
               Map<String,String> readerMap = (Map<String,String>) ois.readObject();
               
+              fis.close();
               ois.close();
               
               System.out.println("read " + readerMap);

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Thu Jul  5 21:03:16 2012
@@ -38,7 +38,10 @@ import org.apache.accumulo.core.file.rfi
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -46,10 +49,7 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.tabletserver.TabletServer;
 import org.apache.accumulo.server.tabletserver.TabletTime;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.io.Text;
 

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java Thu Jul  5 21:03:16 2012
@@ -34,14 +34,14 @@ import org.apache.accumulo.core.util.Ser
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.util.TServerUtils;
 import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java Thu Jul  5 21:03:16 2012
@@ -103,7 +103,9 @@ public class Framework {
     String module = args[3];
     
     Properties props = new Properties();
-    props.load(new FileInputStream(configDir + "/randomwalk.conf"));
+    FileInputStream fis = new FileInputStream(configDir + "/randomwalk.conf");
+    props.load(fis);
+    fis.close();
     
     System.setProperty("localLog", localLogPath + "/" + logId);
     System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + logId);

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java Thu Jul  5 21:03:16 2012
@@ -139,7 +139,8 @@ public class State {
     files = libdir.list();
     for (int i = 0; i < files.length; i++) {
       String f = files[i];
-      if (f.matches("^accumulo-core-.+jar$") || f.matches("^accumulo-server-.+jar$") || f.matches("^cloudtrace-.+jar$") || f.matches("^libthrift-.+jar$")) {
+      if (f.matches("^accumulo-core-.+jar$") || f.matches("^accumulo-server-.+jar$") || f.matches("^accumulo-fate-.+jar$") || f.matches("^cloudtrace-.+jar$")
+          || f.matches("^libthrift-.+jar$")) {
         if (retval == null) {
           retval = String.format("%s/%s", libdir.getAbsolutePath(), f);
         } else {

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java Thu Jul  5 21:03:16 2012
@@ -31,10 +31,6 @@ import org.apache.accumulo.server.test.r
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
-/**
- * @author jwvines
- * 
- */
 public class SecurityHelper {
   protected final static Logger log = Logger.getLogger(SecurityHelper.class);
   

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java Thu Jul  5 21:03:16 2012
@@ -52,8 +52,11 @@ public class Run {
     Properties scaleProps = new Properties();
     Properties testProps = new Properties();
     try {
-      scaleProps.load(new FileInputStream(sitePath));
-      testProps.load(new FileInputStream(testPath));
+      FileInputStream fis = new FileInputStream(sitePath);
+      scaleProps.load(fis);
+      fis.close();
+      fis = new FileInputStream(testPath);
+      testProps.load(fis);
     } catch (Exception e) {
       System.out.println("Problem loading config file");
       e.printStackTrace();

Modified: accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1357909&r1=1357908&r2=1357909&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/branches/ACCUMULO-652/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Thu Jul  5 21:03:16 2012
@@ -39,12 +39,12 @@ import org.apache.accumulo.core.util.Add
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;