You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2012/06/25 19:09:39 UTC

svn commit: r1353663 [5/6] - in /accumulo/branches/ACCUMULO-259: ./ bin/ conf/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/512MB/native-standalone/ conf/examples/512MB/standalone/ core/ core/src/main/java/org/apache...

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Mon Jun 25 17:09:31 2012
@@ -56,10 +56,10 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
 import org.apache.accumulo.server.monitor.servlets.JSONServlet;
 import org.apache.accumulo.server.monitor.servlets.LogServlet;
-import org.apache.accumulo.server.monitor.servlets.LoggersServlet;
 import org.apache.accumulo.server.monitor.servlets.MasterServlet;
 import org.apache.accumulo.server.monitor.servlets.OperationServlet;
 import org.apache.accumulo.server.monitor.servlets.ProblemServlet;
+import org.apache.accumulo.server.monitor.servlets.ShellServlet;
 import org.apache.accumulo.server.monitor.servlets.TServersServlet;
 import org.apache.accumulo.server.monitor.servlets.TablesServlet;
 import org.apache.accumulo.server.monitor.servlets.VisServlet;
@@ -148,6 +148,8 @@ public class Monitor {
   
   private static ServerConfiguration config;
   
+  private static EmbeddedWebServer server;
+  
   public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
     Map<String,Double> compactingByTable = new HashMap<String,Double>();
     if (mmi != null && mmi.tServerInfo != null) {
@@ -307,85 +309,84 @@ public class Monitor {
         if (mmi == null)
           UtilWaitThread.sleep(1000);
       }
-      
-      int majorCompactions = 0;
-      int minorCompactions = 0;
-      
-      lookupRateTracker.startingUpdates();
-      indexCacheHitTracker.startingUpdates();
-      indexCacheRequestTracker.startingUpdates();
-      dataCacheHitTracker.startingUpdates();
-      dataCacheRequestTracker.startingUpdates();
-      
-      for (TabletServerStatus server : mmi.tServerInfo) {
-        TableInfo summary = Monitor.summarizeTableStats(server);
-        totalIngestRate += summary.ingestRate;
-        totalIngestByteRate += summary.ingestByteRate;
-        totalQueryRate += summary.queryRate;
-        totalScanRate += summary.scanRate;
-        totalQueryByteRate += summary.queryByteRate;
-        totalEntries += summary.recs;
-        totalHoldTime += server.holdTime;
-        totalLookups += server.lookups;
-        majorCompactions += summary.major.running;
-        minorCompactions += summary.minor.running;
-        lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
-        indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
-        indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
-        dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
-        dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
-      }
-      
-      lookupRateTracker.finishedUpdating();
-      indexCacheHitTracker.finishedUpdating();
-      indexCacheRequestTracker.finishedUpdating();
-      dataCacheHitTracker.finishedUpdating();
-      dataCacheRequestTracker.finishedUpdating();
-      
-      int totalTables = 0;
-      for (TableInfo tInfo : mmi.tableMap.values()) {
-        totalTabletCount += tInfo.tablets;
-        onlineTabletCount += tInfo.onlineTablets;
-        totalTables++;
-      }
-      Monitor.totalIngestRate = totalIngestRate;
-      Monitor.totalTables = totalTables;
-      totalIngestByteRate = totalIngestByteRate / 1000000.0;
-      Monitor.totalIngestByteRate = totalIngestByteRate;
-      Monitor.totalQueryRate = totalQueryRate;
-      Monitor.totalScanRate = totalScanRate;
-      totalQueryByteRate = totalQueryByteRate / 1000000.0;
-      Monitor.totalQueryByteRate = totalQueryByteRate;
-      Monitor.totalEntries = totalEntries;
-      Monitor.totalTabletCount = totalTabletCount;
-      Monitor.onlineTabletCount = onlineTabletCount;
-      Monitor.totalHoldTime = totalHoldTime;
-      Monitor.totalLookups = totalLookups;
-      
-      ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
-      ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
-      recoveriesOverTime.add(new Pair<Long,Integer>(currentTime, mmi.recovery.size()));
-      
-      double totalLoad = 0.;
-      for (TabletServerStatus status : mmi.tServerInfo) {
-        if (status != null)
-          totalLoad += status.osLoad;
-      }
-      loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
-      
-      minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
-      majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
-      
-      lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
-      
-      queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
-      queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
-      
-      scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+      if (mmi != null) {
+        int majorCompactions = 0;
+        int minorCompactions = 0;
+        
+        lookupRateTracker.startingUpdates();
+        indexCacheHitTracker.startingUpdates();
+        indexCacheRequestTracker.startingUpdates();
+        dataCacheHitTracker.startingUpdates();
+        dataCacheRequestTracker.startingUpdates();
 
-      calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
-      calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
-      
+        for (TabletServerStatus server : mmi.tServerInfo) {
+          TableInfo summary = Monitor.summarizeTableStats(server);
+          totalIngestRate += summary.ingestRate;
+          totalIngestByteRate += summary.ingestByteRate;
+          totalQueryRate += summary.queryRate;
+          totalScanRate += summary.scanRate;
+          totalQueryByteRate += summary.queryByteRate;
+          totalEntries += summary.recs;
+          totalHoldTime += server.holdTime;
+          totalLookups += server.lookups;
+          majorCompactions += summary.major.running;
+          minorCompactions += summary.minor.running;
+          lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
+          indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
+          indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
+          dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
+          dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
+        }
+        
+        lookupRateTracker.finishedUpdating();
+        indexCacheHitTracker.finishedUpdating();
+        indexCacheRequestTracker.finishedUpdating();
+        dataCacheHitTracker.finishedUpdating();
+        dataCacheRequestTracker.finishedUpdating();
+        
+        int totalTables = 0;
+        for (TableInfo tInfo : mmi.tableMap.values()) {
+          totalTabletCount += tInfo.tablets;
+          onlineTabletCount += tInfo.onlineTablets;
+          totalTables++;
+        }
+        Monitor.totalIngestRate = totalIngestRate;
+        Monitor.totalTables = totalTables;
+        totalIngestByteRate = totalIngestByteRate / 1000000.0;
+        Monitor.totalIngestByteRate = totalIngestByteRate;
+        Monitor.totalQueryRate = totalQueryRate;
+        Monitor.totalScanRate = totalScanRate;
+        totalQueryByteRate = totalQueryByteRate / 1000000.0;
+        Monitor.totalQueryByteRate = totalQueryByteRate;
+        Monitor.totalEntries = totalEntries;
+        Monitor.totalTabletCount = totalTabletCount;
+        Monitor.onlineTabletCount = onlineTabletCount;
+        Monitor.totalHoldTime = totalHoldTime;
+        Monitor.totalLookups = totalLookups;
+        
+        ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
+        ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
+        
+        double totalLoad = 0.;
+        for (TabletServerStatus status : mmi.tServerInfo) {
+          if (status != null)
+            totalLoad += status.osLoad;
+        }
+        loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
+        
+        minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
+        majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
+        
+        lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
+        
+        queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
+        queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
+        
+        scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+        
+        calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
+        calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
+      }
       try {
         Monitor.problemSummary = ProblemReports.getInstance().summarize();
         Monitor.problemException = null;
@@ -462,7 +463,6 @@ public class Monitor {
   public void run(String hostname) {
     Monitor.START_TIME = System.currentTimeMillis();
     int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
-    EmbeddedWebServer server;
     try {
       log.debug("Creating monitor on port " + port);
       server = EmbeddedWebServer.create(port);
@@ -476,7 +476,6 @@ public class Monitor {
     server.addServlet(MasterServlet.class, "/master");
     server.addServlet(TablesServlet.class, "/tables");
     server.addServlet(TServersServlet.class, "/tservers");
-    server.addServlet(LoggersServlet.class, "/loggers");
     server.addServlet(ProblemServlet.class, "/problems");
     server.addServlet(GcStatusServlet.class, "/gc");
     server.addServlet(LogServlet.class, "/log");
@@ -486,6 +485,8 @@ public class Monitor {
     server.addServlet(Summary.class, "/trace/summary");
     server.addServlet(ListType.class, "/trace/listType");
     server.addServlet(ShowTrace.class, "/trace/show");
+    if (server.isUsingSsl())
+      server.addServlet(ShellServlet.class, "/shell");
     LogService.startLogListener(Monitor.getSystemConfiguration());
     server.start();
     
@@ -545,7 +546,7 @@ public class Monitor {
   public static double getTotalScanRate() {
     return totalScanRate;
   }
-
+  
   public static double getTotalQueryByteRate() {
     return totalQueryByteRate;
   }
@@ -631,7 +632,7 @@ public class Monitor {
       return new ArrayList<Pair<Long,Integer>>(scanRateOverTime);
     }
   }
-
+  
   public static List<Pair<Long,Double>> getQueryByteRateOverTime() {
     synchronized (queryByteRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime);
@@ -657,4 +658,8 @@ public class Monitor {
   public static Instance getInstance() {
     return instance;
   }
+  
+  public static boolean isUsingSsl() {
+    return server.isUsingSsl();
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Mon Jun 25 17:09:31 2012
@@ -73,7 +73,7 @@ abstract public class BasicServlet exten
     }
   }
   
-  protected final void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
     doGet(req, resp);
   }
   
@@ -136,7 +136,8 @@ abstract public class BasicServlet exten
     // BEGIN HEADER
     sb.append("<head>\n");
     sb.append("<title>").append(getTitle(req)).append(" - Accumulo ").append(Constants.VERSION).append("</title>\n");
-    if ((refresh > 0) && (req.getRequestURI().startsWith("/docs") == false) && (req.getRequestURI().startsWith("/vis") == false))
+    if ((refresh > 0) && (req.getRequestURI().startsWith("/docs") == false) && (req.getRequestURI().startsWith("/vis") == false)
+        && (req.getRequestURI().startsWith("/shell") == false))
       sb.append("<meta http-equiv='refresh' content='" + refresh + "' />\n");
     sb.append("<meta http-equiv='Content-Type' content='").append(DEFAULT_CONTENT_TYPE).append("' />\n");
     sb.append("<meta http-equiv='Content-Script-Type' content='text/javascript' />\n");
@@ -175,7 +176,6 @@ abstract public class BasicServlet exten
     sb.append("<hr />\n");
     sb.append("<a href='/master'>Master&nbsp;Server</a><br />\n");
     sb.append("<a href='/tservers'>Tablet&nbsp;Servers</a><br />\n");
-    sb.append("<a href='/loggers'>Logger&nbsp;Servers</a><br />\n");
     sb.append("<a href='/vis'>Server Activity</a><br />\n");
     sb.append("<a href='/gc'>Garbage&nbsp;Collector</a><br />\n");
     sb.append("<a href='/tables'>Tables</a><br />\n");
@@ -190,6 +190,8 @@ abstract public class BasicServlet exten
     sb.append("<hr />\n");
     sb.append("<a href='/xml'>XML</a><br />\n");
     sb.append("<a href='/json'>JSON</a><hr />\n");
+    if (Monitor.isUsingSsl())
+      sb.append("<a href='/shell'>Shell</a><hr />\n");
     sb.append("<div class='smalltext'>[<a href='").append("/op?action=refresh&value=").append(refresh < 1 ? "5" : "-1");
     sb.append("&redir=").append(currentPage(req)).append("'>");
     sb.append(refresh < 1 ? "en" : "dis").append("able&nbsp;auto-refresh</a>]</div>\n");

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java Mon Jun 25 17:09:31 2012
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.master.thrift.MasterState;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.monitor.DedupedLogEvent;
@@ -43,9 +42,9 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.monitor.util.Table;
 import org.apache.accumulo.server.monitor.util.TableRow;
 import org.apache.accumulo.server.monitor.util.celltypes.DurationType;
-import org.apache.accumulo.server.monitor.util.celltypes.LoggerLinkType;
 import org.apache.accumulo.server.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.monitor.util.celltypes.ProgressChartType;
+import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.log4j.Level;
 
 public class MasterServlet extends BasicServlet {
@@ -57,7 +56,7 @@ public class MasterServlet extends Basic
   @Override
   protected String getTitle(HttpServletRequest req) {
     List<String> masters = Monitor.getInstance().getMasterLocations();
-    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
   }
   
   @Override
@@ -137,8 +136,6 @@ public class MasterServlet extends Basic
       masterStatus.addSortableColumn("#&nbsp;Online<br />Tablet&nbsp;Servers", new NumberType<Integer>((int) (slaves.size() * 0.8 + 1.0), slaves.size(),
           (int) (slaves.size() * 0.6 + 1.0), slaves.size()), "Number of tablet servers currently available");
       masterStatus.addSortableColumn("#&nbsp;Total<br />Tablet&nbsp;Servers", new NumberType<Integer>(), "The total number of tablet servers configured");
-      masterStatus.addSortableColumn("Loggers", new NumberType<Integer>((int) (slaves.size() * .8), Integer.MAX_VALUE, 1, Integer.MAX_VALUE),
-          "The number of write-ahead loggers.  This should be approximately the same as the number of tablet servers (and greater than zero).");
       masterStatus.addSortableColumn("Last&nbsp;GC", null, "The last time files were cleaned-up from HDFS.");
       masterStatus.addSortableColumn("#&nbsp;Tablets", new NumberType<Integer>(0, Integer.MAX_VALUE, 2, Integer.MAX_VALUE), null);
       masterStatus.addSortableColumn("#&nbsp;Unassigned<br />Tablets", new NumberType<Integer>(0, 0), null);
@@ -153,10 +150,9 @@ public class MasterServlet extends Basic
       masterStatus.addSortableColumn("OS&nbsp;Load", new NumberType<Double>(0., guessHighLoad * 1., 0., guessHighLoad * 3.),
           "The one-minute load average on the computer that runs the monitor web server.");
       TableRow row = masterStatus.prepareRow();
-      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
       row.add(Monitor.getMmi().tServerInfo.size());
       row.add(slaves.size());
-      row.add(Monitor.getMmi().loggers.size());
       row.add("<a href='/gc'>" + gcStatus + "</a>");
       row.add(Monitor.getTotalTabletCount());
       row.add(Monitor.getMmi().unassignedTablets);
@@ -177,25 +173,28 @@ public class MasterServlet extends Basic
   private void doRecoveryList(HttpServletRequest req, StringBuilder sb) {
     MasterMonitorInfo mmi = Monitor.getMmi();
     if (mmi != null) {
-      List<RecoveryStatus> jobs = mmi.recovery;
-      if (jobs != null && jobs.size() > 0) {
-        Table recoveryTable = new Table("logRecovery", "Log&nbsp;Recovery");
-        recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
-        recoveryTable.addSortableColumn("Server", new LoggerLinkType(), null);
-        recoveryTable.addSortableColumn("Log");
-        recoveryTable.addSortableColumn("Time", new DurationType(), null);
-        recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
-        
-        for (RecoveryStatus recovery : jobs) {
-          TableRow row = recoveryTable.prepareRow();
-          row.add(recovery);
-          row.add(recovery.name);
-          row.add((long) recovery.runtime);
-          row.add(recovery.copyProgress);
-          recoveryTable.addRow(row);
+      Table recoveryTable = new Table("logRecovery", "Log&nbsp;Recovery");
+      recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
+      recoveryTable.addSortableColumn("Server");
+      recoveryTable.addSortableColumn("Log");
+      recoveryTable.addSortableColumn("Time", new DurationType(), null);
+      recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
+      int rows = 0;
+      for (TabletServerStatus server : mmi.tServerInfo) {
+        if (server.logSorts != null) {
+          for (RecoveryStatus recovery : server.logSorts) {
+            TableRow row = recoveryTable.prepareRow();
+            row.add(AddressUtil.parseAddress(server.name, Property.TSERV_CLIENTPORT).getHostName());
+            row.add(recovery.name);
+            row.add((long) recovery.runtime);
+            row.add(recovery.progress);
+            recoveryTable.addRow(row);
+            rows++;
+          }
         }
-        recoveryTable.generate(req, sb);
       }
+      if (rows > 0)
+        recoveryTable.generate(req, sb);
     }
   }
   

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java Mon Jun 25 17:09:31 2012
@@ -167,8 +167,6 @@ public class OperationServlet extends Ba
       // a dead server should have a uniq address: a logger or tserver
       DeadServerList obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADTSERVERS);
       obit.delete(server);
-      obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADLOGGERS);
-      obit.delete(server);
     }
   }
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java Mon Jun 25 17:09:31 2012
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.DeadServer;
-import org.apache.accumulo.core.master.thrift.LoggerStatus;
 import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -80,12 +79,6 @@ public class XMLServlet extends BasicSer
       sb.append("</compactions>\n");
       
       sb.append("<tablets>").append(summary.tablets).append("</tablets>\n");
-      if (status.loggers != null) {
-        sb.append("<loggers>");
-        for (String logger : status.loggers)
-          sb.append("<logger>" + logger + "</logger>");
-        sb.append("</loggers>");
-      }
       
       sb.append("<ingest>").append(summary.ingestRate).append("</ingest>\n");
       sb.append("<query>").append(summary.queryRate).append("</query>\n");
@@ -110,12 +103,6 @@ public class XMLServlet extends BasicSer
     }
     sb.append("\n</badTabletServers>\n");
     
-    sb.append("\n<loggers>\n");
-    for (LoggerStatus entry : Monitor.getMmi().loggers) {
-      sb.append(String.format("<logger id='%s'/>\n", entry.logger));
-    }
-    sb.append("\n</loggers>\n");
-    
     sb.append("\n<tabletServersShuttingDown>\n");
     for (String server : Monitor.getMmi().serversShuttingDown) {
       sb.append(String.format("<server id='%s'/>\n", server));

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Mon Jun 25 17:09:31 2012
@@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyIterator;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.collections.map.LRUMap;

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Mon Jun 25 17:09:31 2012
@@ -79,7 +79,7 @@ public class Compactor implements Callab
   private CompactionEnv env;
   private Configuration conf;
   private FileSystem fs;
-  private KeyExtent extent;
+  protected KeyExtent extent;
   private List<IteratorSetting> iterators;
   
   Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Mon Jun 25 17:09:31 2012
@@ -22,10 +22,13 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
@@ -64,6 +67,15 @@ public class MinorCompactor extends Comp
     });
   }
   
+  private boolean isTableDeleting() {
+    try {
+      return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
+    } catch (Exception e) {
+      log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
+      return false; // can not get positive confirmation that its deleting.
+    }
+  }
+
   @Override
   public CompactionStats call() {
     log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
@@ -75,7 +87,6 @@ public class MinorCompactor extends Comp
     boolean reportedProblem = false;
     
     do {
-      
       try {
         CompactionStats ret = super.call();
         
@@ -117,6 +128,9 @@ public class MinorCompactor extends Comp
         log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
       }
       
+      if (isTableDeleting())
+        return new CompactionStats(0, 0);
+
     } while (true);
   }
   

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon Jun 25 17:09:31 2012
@@ -111,8 +111,8 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.server.trace.TraceFileSystem;
@@ -209,7 +209,7 @@ public class Tablet {
       return Tablet.this;
     }
     
-    public boolean beginUpdatingLogsUsed(ArrayList<RemoteLogger> copy, boolean mincFinish) {
+    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
       return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
     }
     
@@ -1178,7 +1178,6 @@ public class Tablet {
       for (int i = 0; i < files.length; i++) {
         paths[i] = files[i].getPath();
       }
-      log.debug("fs " + fs + " files: " + Arrays.toString(paths) + " location: " + location);
       Collection<String> goodPaths = cleanUpFiles(fs, files, location, true);
       for (String path : goodPaths) {
         String filename = new Path(path).getName();
@@ -1226,17 +1225,6 @@ public class Tablet {
     return datafiles;
   }
   
-  private static Set<RemoteLogger> getCurrentLoggers(List<LogEntry> entries) {
-    Set<RemoteLogger> result = new HashSet<RemoteLogger>();
-    for (LogEntry logEntry : entries) {
-      for (String log : logEntry.logSet) {
-        String[] parts = log.split("/", 2);
-        result.add(new RemoteLogger(parts[0], parts[1], null));
-      }
-    }
-    return result;
-  }
-  
   private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
     List<LogEntry> logEntries = new ArrayList<LogEntry>();
     
@@ -1470,7 +1458,15 @@ public class Tablet {
           throw new RuntimeException(t);
         }
       }
-      currentLogs = getCurrentLoggers(logEntries);
+      // make some closed references that represent the recovered logs
+      currentLogs = new HashSet<DfsLogger>();
+      for (LogEntry logEntry : logEntries) {
+        for (String log : logEntry.logSet) {
+          String[] parts = log.split("/", 2);
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
+        }
+      }
+      
       log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
           + " entries created)");
     }
@@ -2241,7 +2237,7 @@ public class Tablet {
   private synchronized MinorCompactionTask prepareForMinC(long flushId) {
     CommitSession oldCommitSession = tabletMemory.prepareForMinC();
     otherLogs = currentLogs;
-    currentLogs = new HashSet<RemoteLogger>();
+    currentLogs = new HashSet<DfsLogger>();
     
     String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
     
@@ -3243,12 +3239,8 @@ public class Tablet {
           log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet())) + " --> "
               + datafileManager.abs2rel(new Path(compactTmpName)));
 
-          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, // always
-                                                                                                                                          // propagate
-                                                                                                                                          // deletes,
-                                                                                                                                          // unless
-                                                                                                                                          // last
-                                                                                                                                          // batch
+          // always propagate deletes, unless last batch
+          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true,
               acuTableConf, extent, cenv, compactionIterators);
           
           CompactionStats mcs = compactor.call();
@@ -3650,8 +3642,18 @@ public class Tablet {
     }
   }
   
-  private Set<RemoteLogger> currentLogs = new HashSet<RemoteLogger>();
+  private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
   
+  public Set<String> getCurrentLogs() {
+    Set<String> result = new HashSet<String>();
+    synchronized (currentLogs) {
+      for (DfsLogger log : currentLogs) {
+        result.add(log.toString());
+      }
+    }
+    return result;
+  }
+
   private Set<String> beginClearingUnusedLogs() {
     Set<String> doomed = new HashSet<String>();
     
@@ -3665,12 +3667,12 @@ public class Tablet {
       if (removingLogs)
         throw new IllegalStateException("Attempted to clear logs when removal of logs in progress");
       
-      for (RemoteLogger logger : otherLogs) {
+      for (DfsLogger logger : otherLogs) {
         otherLogsCopy.add(logger.toString());
         doomed.add(logger.toString());
       }
       
-      for (RemoteLogger logger : currentLogs) {
+      for (DfsLogger logger : currentLogs) {
         currentLogsCopy.add(logger.toString());
         doomed.remove(logger.toString());
       }
@@ -3698,7 +3700,7 @@ public class Tablet {
     logLock.unlock();
   }
   
-  private Set<RemoteLogger> otherLogs = Collections.emptySet();
+  private Set<DfsLogger> otherLogs = Collections.emptySet();
   private boolean removingLogs = false;
   
   // this lock is basically used to synchronize writing of log info to !METADATA
@@ -3708,7 +3710,7 @@ public class Tablet {
     return currentLogs.size();
   }
   
-  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<RemoteLogger> more, boolean mincFinish) {
+  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
     
     boolean releaseLock = true;
     
@@ -3745,7 +3747,7 @@ public class Tablet {
         
         int numAdded = 0;
         int numContained = 0;
-        for (RemoteLogger logger : more) {
+        for (DfsLogger logger : more) {
           if (addToOther) {
             if (otherLogs.add(logger))
               numAdded++;

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 25 17:09:31 2012
@@ -18,11 +18,12 @@ package org.apache.accumulo.server.table
 
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
+import java.io.EOFException;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
@@ -45,6 +46,7 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
@@ -97,7 +99,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.master.MasterNotRunningException;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -134,6 +135,8 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -157,10 +160,9 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
-import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
 import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
 import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
@@ -187,9 +189,12 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.Platform;
-import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -209,22 +214,22 @@ public class TabletServer extends Abstra
   private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
   private static long lastMemorySize = 0;
   private static long gcTimeIncreasedCount;
-  private static final Class<? extends LoggerStrategy> DEFAULT_LOGGER_STRATEGY = RoundRobinLoggerStrategy.class;
   
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   
   private TabletServerLogger logger;
-  private LoggerStrategy loggerStrategy;
   
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
   
   private ServerConfiguration serverConfig;
+  private LogSorter logSorter = null;
   
   public TabletServer(ServerConfiguration conf, FileSystem fs) {
     super();
     this.serverConfig = conf;
     this.instance = conf.getInstance();
     this.fs = TraceFileSystem.wrap(fs);
+    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
     SimpleTimer.getInstance().schedule(new TimerTask() {
       @Override
       public void run() {
@@ -891,7 +896,7 @@ public class TabletServer extends Abstra
         
         ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
         String oldThreadName = Thread.currentThread().getName();
-
+        
         try {
           runState.set(ScanRunState.RUNNING);
           Thread.currentThread().setName(
@@ -956,8 +961,9 @@ public class TabletServer extends Abstra
           Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
           if (isCancelled() || session == null)
             return;
-
-          long maxResultsSize = acuConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+          
+          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
           long bytesAdded = 0;
           long maxScanTime = 4000;
           
@@ -1819,7 +1825,7 @@ public class TabletServer extends Abstra
       final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
       // Root tablet assignment must take place immediately
       if (extent.isRootTablet()) {
-        new Thread("Root Tablet Assignment") {
+        new Daemon("Root Tablet Assignment") {
           public void run() {
             ah.run();
             if (onlineTablets.containsKey(extent)) {
@@ -1942,12 +1948,6 @@ public class TabletServer extends Abstra
       return statsKeeper.getTabletStats();
     }
     
-    @Override
-    public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {
-      loggerStrategy.preferLoggers(loggers);
-    }
-    
-    @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
       try {
         checkPermission(credentials, null, true, "getScans");
@@ -2011,6 +2011,60 @@ public class TabletServer extends Abstra
 
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+     */
+    @Override
+    public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+      String myname = getClientAddressString();
+      myname = myname.replace(':', '+');
+      Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+      Set<String> loggers = new HashSet<String>();
+      logger.getLoggers(loggers);
+      nextFile:
+      for (String filename : filenames) {
+        for (String logger : loggers) {
+          if (logger.contains(filename))
+            continue nextFile;
+        }
+        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+        synchronized (onlineTablets) {
+          onlineTabletsCopy.addAll(onlineTablets.values());
+        }
+        for (Tablet tablet : onlineTabletsCopy) {
+          for (String current : tablet.getCurrentLogs()) {
+            if (current.contains(filename)) {
+              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+              continue nextFile;
+            }
+          }
+        }
+        try {
+          String source = logDir + "/" + filename;
+          if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+            fs.mkdirs(new Path(walogArchive));
+            String dest = walogArchive + "/" + filename;
+            log.info("Archiving walog " + source + " to " + dest);
+            if (!fs.rename(new Path(source), new Path(dest)))
+              log.error("rename is unsuccessful");
+          } else {
+            log.info("Deleting walog " + filename);
+            if (!fs.delete(new Path(source), true))
+              log.warn("Failed to delete walog " + source);
+            if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+              log.info("Deleted any recovery log " + filename);
+
+          }
+        } catch (IOException e) {
+          log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+        }
+      }
+    }
+    
   }
   
   private class SplitRunner implements Runnable {
@@ -2445,7 +2499,7 @@ public class TabletServer extends Abstra
               AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1);
               if (extent.isMeta()) {
                 if (extent.isRootTablet()) {
-                  new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+                  new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
                 } else {
                   resourceManager.addMetaDataAssignment(handler);
                 }
@@ -2463,7 +2517,6 @@ public class TabletServer extends Abstra
   
   private FileSystem fs;
   private Instance instance;
-  private ZooCache cache;
   
   private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
   private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@@ -2496,48 +2549,23 @@ public class TabletServer extends Abstra
     return statsKeeper;
   }
   
-  public Set<String> getLoggers() throws TException, MasterNotRunningException, ThriftSecurityException {
-    Set<String> allLoggers = new HashSet<String>();
-    String dir = ZooUtil.getRoot(instance) + Constants.ZLOGGERS;
-    for (String child : cache.getChildren(dir)) {
-      allLoggers.add(new String(cache.get(dir + "/" + child)));
-    }
-    if (allLoggers.isEmpty()) {
-      log.warn("there are no loggers registered in zookeeper");
-      return allLoggers;
-    }
-    Set<String> result = loggerStrategy.getLoggers(Collections.unmodifiableSet(allLoggers));
-    Set<String> bogus = new HashSet<String>(result);
-    bogus.removeAll(allLoggers);
-    if (!bogus.isEmpty())
-      log.warn("logger strategy is returning loggers that are not candidates");
-    result.removeAll(bogus);
-    if (result.isEmpty())
-      log.warn("strategy returned no useful loggers");
-    return result;
-  }
-  
-  public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
+  public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
     log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
     
-    List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
     long now = RelativeTime.currentTimeMillis();
     List<String> logSet = new ArrayList<String>();
-    for (RemoteLogger log : logs)
+    for (DfsLogger log : logs)
       logSet.add(log.toString());
-    for (RemoteLogger log : logs) {
-      MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
-      entry.extent = extent;
-      entry.tabletId = id;
-      entry.timestamp = now;
-      entry.server = log.getLogger();
-      entry.filename = log.getFileName();
-      entry.logSet = logSet;
-      entries.add(entry);
-    }
-    MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
+    MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
+    entry.extent = extent;
+    entry.tabletId = id;
+    entry.timestamp = now;
+    entry.server = logs.get(0).getLogger();
+    entry.filename = logs.get(0).getFileName();
+    entry.logSet = logSet;
+    MetadataTable.addLogEntry(SecurityConstants.getSystemCredentials(), entry, getLock());
   }
-  
+
   private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
     ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
         Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK);
@@ -2650,6 +2678,12 @@ public class TabletServer extends Abstra
     }
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
+    try {
+      logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+    } catch (Exception ex) {
+      log.error("Error setting watches for recoveries");
+      throw new RuntimeException(ex);
+    }
     
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
@@ -2960,31 +2994,6 @@ public class TabletServer extends Abstra
     majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor()));
     majorCompactorThread.setName("Split/MajC initiator");
     majorCompactorThread.start();
-    
-    String className = getSystemConfiguration().get(Property.TSERV_LOGGER_STRATEGY);
-    Class<? extends LoggerStrategy> klass = DEFAULT_LOGGER_STRATEGY;
-    try {
-      klass = AccumuloClassLoader.loadClass(className, LoggerStrategy.class);
-    } catch (Exception ex) {
-      log.warn("Unable to load class " + className + " for logger strategy, using " + klass.getName(), ex);
-    }
-    try {
-      Constructor<? extends LoggerStrategy> constructor = klass.getConstructor(TabletServer.class);
-      loggerStrategy = constructor.newInstance(this);
-      loggerStrategy.init(serverConfig);
-    } catch (Exception ex) {
-      log.warn("Unable to create object of type " + klass.getName() + " using " + DEFAULT_LOGGER_STRATEGY.getName());
-    }
-    if (loggerStrategy == null) {
-      try {
-        loggerStrategy = DEFAULT_LOGGER_STRATEGY.getConstructor(TabletServer.class).newInstance(this);
-      } catch (Exception ex) {
-        log.fatal("Programmer error: cannot create a logger strategy.");
-        throw new RuntimeException(ex);
-      }
-    }
-    cache = new ZooCache();
-    
   }
   
   public TabletServerStatus getStats(Map<String,MapCounter<ScanRunState>> scanCounts) {
@@ -3065,12 +3074,11 @@ public class TabletServer extends Abstra
     result.name = getClientAddressString();
     result.holdTime = resourceManager.holdTime();
     result.lookups = seekCount.get();
-    result.loggers = new HashSet<String>();
     result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount();
     result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount();
     result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
     result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
-    logger.getLoggers(result.loggers);
+    result.logSorts = logSorter.getLogSorts();
     return result;
   }
   
@@ -3082,6 +3090,7 @@ public class TabletServer extends Abstra
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
+      recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");
@@ -3090,7 +3099,61 @@ public class TabletServer extends Abstra
       log.error("Uncaught exception in TabletServer.main, exiting", ex);
     }
   }
-  
+
+  /**
+   * Copy local walogs into HDFS on an upgrade
+   * 
+   */
+  public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
+    FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
+    AccumuloConfiguration conf = serverConf.getConfiguration();
+    String localWalDirectories = conf.get(Property.LOGGER_DIR);
+    for (String localWalDirectory : localWalDirectories.split(",")) {
+      if (!localWalDirectory.startsWith("/")) {
+        localWalDirectory = System.getenv("ACCUMULO_HOME") + "/" + localWalDirectory;
+      }
+      
+      FileStatus status = null;
+      try {
+        status = localfs.getFileStatus(new Path(localWalDirectory));
+      } catch (FileNotFoundException fne) {}
+      
+      if (status == null || !status.isDir()) {
+        log.debug("Local walog dir " + localWalDirectory + " not found ");
+        continue;
+      }
+
+      for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
+        String name = file.getPath().getName();
+        try {
+          UUID.fromString(name);
+        } catch (IllegalArgumentException ex) {
+          log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
+          continue;
+        }
+        LogFileKey key = new LogFileKey();
+        LogFileValue value = new LogFileValue();
+        log.info("Openning local log " + file.getPath());
+        Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
+        Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
+        FSDataOutputStream writer = fs.create(tmp);
+        while (reader.next(key, value)) {
+          try {
+            key.write(writer);
+            value.write(writer);
+          } catch (EOFException ex) {
+            break;
+          }
+        }
+        writer.close();
+        reader.close();
+        fs.rename(tmp, new Path(tmp.getParent(), name));
+        log.info("Copied local log " + name);
+        localfs.delete(new Path(localWalDirectory, name), true);
+      }
+    }
+  }
+
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions++;
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3113,7 +3176,7 @@ public class TabletServer extends Abstra
       String recovery = null;
       for (String log : entry.logSet) {
         String[] parts = log.split("/"); // "host:port/filename"
-        log = ServerConstants.getRecoveryDir() + "/" + parts[1] + ".recovered";
+        log = ServerConstants.getRecoveryDir() + "/" + parts[1];
         Path finished = new Path(log + "/finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
@@ -3306,11 +3369,27 @@ public class TabletServer extends Abstra
     return METRICS_PREFIX;
   }
   
-  // public AccumuloConfiguration getTableConfiguration(String tableId) {
-  // return ServerConfiguration.getTableConfiguration(instance, tableId);
-  // }
-
   public TableConfiguration getTableConfiguration(KeyExtent extent) {
     return ServerConfiguration.getTableConfiguration(instance, extent.getTableId().toString());
   }
+  public DfsLogger.ServerResources getServerConfig() {
+    return new DfsLogger.ServerResources() {
+      
+      @Override
+      public FileSystem getFileSystem() {
+        return fs;
+      }
+      
+      @Override
+      public Set<TServerInstance> getCurrentTServers() {
+        return null;
+      }
+      
+      @Override
+      public AccumuloConfiguration getConfiguration() {
+        return getSystemConfiguration();
+      }
+    };
+  }
+
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Mon Jun 25 17:09:31 2012
@@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 import org.apache.hadoop.fs.FileSystem;

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Jun 25 17:09:31 2012
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -35,13 +34,12 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.tabletserver.Tablet;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.TabletServer;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
 import org.apache.log4j.Logger;
 
 /**
@@ -61,7 +59,7 @@ public class TabletServerLogger {
   private final TabletServer tserver;
   
   // The current log set: always updated to a new set with every change of loggers
-  private final List<RemoteLogger> loggers = new ArrayList<RemoteLogger>();
+  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
   
   // The current generation of logSet.
   // Because multiple threads can be using a log set at one time, a log
@@ -134,7 +132,7 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
   }
   
-  private int initializeLoggers(final List<RemoteLogger> copy) throws IOException {
+  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
     final int[] result = {-1};
     testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       boolean test() {
@@ -165,8 +163,8 @@ public class TabletServerLogger {
   public void getLoggers(Set<String> loggersOut) {
     logSetLock.readLock().lock();
     try {
-      for (RemoteLogger logger : loggers) {
-        loggersOut.add(logger.getLogger());
+      for (DfsLogger logger : loggers) {
+        loggersOut.add(logger.toString());
       }
     } finally {
       logSetLock.readLock().unlock();
@@ -183,35 +181,13 @@ public class TabletServerLogger {
     }
     
     try {
-      while (true) {
-        Set<String> loggerAddresses = tserver.getLoggers();
-        if (!loggerAddresses.isEmpty()) {
-          for (String logger : loggerAddresses) {
-            try {
-              loggers.add(new RemoteLogger(logger, UUID.randomUUID(), tserver.getSystemConfiguration()));
-            } catch (LoggerClosedException t) {
-              close();
-              break;
-            } catch (Exception t) {
-              close();
-              log.warn("Unable to connect to " + logger + ": " + t);
-              break;
-            }
-          }
-          
-          if (loggers.size() == loggerAddresses.size())
-            break;
-          if (loggers.size() > 0) {
-            // something is screwy, loggers.size() should be 0 or loggerAddresses.size()..
-            throw new RuntimeException("Unexpected number of loggers " + loggers.size() + " " + loggerAddresses.size());
-          }
-        }
-        UtilWaitThread.sleep(1000);
-      }
+      DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+      alog.open(tserver.getClientAddressString());
+      loggers.add(alog);
       logSetId.incrementAndGet();
       return;
     } catch (Exception t) {
-      throw new IOException(t);
+      throw new RuntimeException(t);
     }
   }
   
@@ -229,13 +205,13 @@ public class TabletServerLogger {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      for (RemoteLogger logger : loggers) {
+      for (DfsLogger logger : loggers) {
         try {
           logger.close();
-        } catch (LoggerClosedException ex) {
-          // expected
+        } catch (DfsLogger.LogClosedException ex) {
+          // ignore
         } catch (Throwable ex) {
-          log.error("Unable to cleanly close logger " + logger.getLogger() + ": " + ex);
+          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
         }
       }
       loggers.clear();
@@ -246,7 +222,7 @@ public class TabletServerLogger {
   }
   
   interface Writer {
-    LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
+    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
   }
   
   private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -265,7 +241,7 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        ArrayList<RemoteLogger> copy = new ArrayList<RemoteLogger>();
+        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
         currentLogSet = initializeLoggers(copy);
         
         // add the logger to the log set for the memory in the tablet,
@@ -294,7 +270,7 @@ public class TabletServerLogger {
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
           ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
-          for (RemoteLogger wal : copy) {
+          for (DfsLogger wal : copy) {
             LoggerOperation lop = writer.write(wal, seq);
             if (lop != null)
               queuedOperations.add(lop);
@@ -307,13 +283,11 @@ public class TabletServerLogger {
           // double-check: did the log set change?
           success = (currentLogSet == logSetId.get());
         }
+      } catch (DfsLogger.LogClosedException ex) {
+        log.debug("Logs closed while writing, retrying " + (attempt + 1));
       } catch (Exception t) {
-        if (attempt == 0) {
-          log.info("Log write failed: another thread probably closed the log");
-        } else {
-          log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
-          UtilWaitThread.sleep(100);
-        }
+        log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+        UtilWaitThread.sleep(100);
       } finally {
         attempt++;
       }
@@ -356,7 +330,7 @@ public class TabletServerLogger {
       return -1;
     return write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
         return null;
       }
@@ -368,7 +342,7 @@ public class TabletServerLogger {
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         return logger.log(tabletSeq, commitSession.getLogId(), m);
       }
     });
@@ -388,7 +362,7 @@ public class TabletServerLogger {
     
     int seq = write(loggables.keySet(), false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
@@ -419,7 +393,7 @@ public class TabletServerLogger {
     
     int seq = write(commitSession, true, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }
@@ -435,7 +409,7 @@ public class TabletServerLogger {
       return -1;
     write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Mon Jun 25 17:09:31 2012
@@ -90,9 +90,6 @@ public class GetMasterStats {
         out(2, "Time Difference %.1f", ((now - server.lastContact) / 1000.));
         out(2, "Total Records %d", summary.recs);
         out(2, "Lookups %d", server.lookups);
-        out(2, "Loggers %d", server.loggers.size());
-        for (String logger : server.loggers)
-          out(3, "Logger %s", logger);
         if (server.holdTime > 0)
           out(2, "Hold Time %d", server.holdTime);
         if (server.tableMap != null && server.tableMap.size() > 0) {
@@ -111,16 +108,12 @@ public class GetMasterStats {
             out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : info.minor.queued);
           }
         }
-      }
-    }
-    if (stats.recovery != null && stats.recovery.size() > 0) {
-      out(0, "Recovery");
-      for (RecoveryStatus r : stats.recovery) {
-        out(1, "Log Server %s", r.host);
-        out(1, "Log Name %s", r.name);
-        out(1, "Map Progress: %.2f%%", r.mapProgress * 100);
-        out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100);
-        out(1, "Time running: %s", r.runtime / 1000.);
+        out(2, "Recoveries %d", server.logSorts.size());
+        for (RecoveryStatus sort : server.logSorts) {
+          out(3, "File %s", sort.name);
+          out(3, "Progress %.2f%%", sort.progress * 100);
+          out(3, "Time running %s", sort.runtime / 1000.);
+        }
       }
     }
   }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java Mon Jun 25 17:09:31 2012
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.B
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -67,11 +68,11 @@ public class ContinuousIngest {
     
     args = processOptions(args);
     
-    if (args.length != 13) {
+    if (args.length != 14) {
       throw new IllegalArgumentException(
           "usage : "
               + ContinuousIngest.class.getName()
-              + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
+              + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <num> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
     }
     
     if (debugLog != null) {
@@ -89,16 +90,17 @@ public class ContinuousIngest {
     
     String table = args[4];
     
-    long min = Long.parseLong(args[5]);
-    long max = Long.parseLong(args[6]);
-    short maxColF = Short.parseShort(args[7]);
-    short maxColQ = Short.parseShort(args[8]);
-    
-    long maxMemory = Long.parseLong(args[9]);
-    long maxLatency = Integer.parseInt(args[10]);
-    int maxWriteThreads = Integer.parseInt(args[11]);
+    long num = Long.parseLong(args[5]);
+    long min = Long.parseLong(args[6]);
+    long max = Long.parseLong(args[7]);
+    short maxColF = Short.parseShort(args[8]);
+    short maxColQ = Short.parseShort(args[9]);
+    
+    long maxMemory = Long.parseLong(args[10]);
+    long maxLatency = Integer.parseInt(args[11]);
+    int maxWriteThreads = Integer.parseInt(args[12]);
     
-    boolean checksum = Boolean.parseBoolean(args[12]);
+    boolean checksum = Boolean.parseBoolean(args[13]);
     
     if (min < 0 || max < 0 || max <= min) {
       throw new IllegalArgumentException("bad min and max");
@@ -109,6 +111,11 @@ public class ContinuousIngest {
     String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
     Tracer.getInstance().addReceiver(new ZooSpanClient(zooKeepers, path, localhost, "cingest", 1000));
     
+    if (!conn.tableOperations().exists(table))
+      try {
+        conn.tableOperations().create(table);
+      } catch (TableExistsException tee) {}
+
     BatchWriter bw = conn.createBatchWriter(table, maxMemory, maxLatency, maxWriteThreads);
     bw = Trace.wrapAll(bw, new CountSampler(1024));
     
@@ -133,7 +140,7 @@ public class ContinuousIngest {
     
     long lastFlushTime = System.currentTimeMillis();
     
-    while (true) {
+    out: while (true) {
       // generate first set of nodes
       for (int index = 0; index < flushInterval; index++) {
         long rowLong = genLong(min, max, r);
@@ -152,6 +159,8 @@ public class ContinuousIngest {
       }
       
       lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+      if (count >= num)
+        break out;
       
       // generate subsequent sets of nodes that link to previous set of nodes
       for (int depth = 1; depth < maxDepth; depth++) {
@@ -165,6 +174,8 @@ public class ContinuousIngest {
         }
         
         lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+        if (count >= num)
+          break out;
       }
       
       // create one big linked list, this makes all of the first inserts
@@ -175,7 +186,11 @@ public class ContinuousIngest {
         bw.addMutation(m);
       }
       lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+      if (count >= num)
+        break out;
     }
+    
+    bw.close();
   }
   
   private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java Mon Jun 25 17:09:31 2012
@@ -95,6 +95,26 @@ public class BadIteratorMincTest extends
     
     if (count != 1)
       throw new Exception("Did not see expected # entries " + count);
+    
+    // now try putting bad iterator back and deleting the table
+    getConnector().tableOperations().setProperty("foo", Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.badi", "30," + BadIterator.class.getName());
+    bw = getConnector().createBatchWriter("foo", 1000000, 60000l, 2);
+    m = new Mutation(new Text("r2"));
+    m.put(new Text("acf"), new Text("foo"), new Value("1".getBytes()));
+    bw.addMutation(m);
+    bw.close();
+    
+    // make sure property is given time to propagate
+    UtilWaitThread.sleep(1000);
+    
+    getConnector().tableOperations().flush("foo", null, null, false);
+    
+    // make sure the flush has time to start
+    UtilWaitThread.sleep(1000);
+    
+    // this should not hang
+    getConnector().tableOperations().delete("foo");
+
   }
   
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java Mon Jun 25 17:09:31 2012
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.accumulo.server.logger.IdentityReducer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -135,7 +134,7 @@ public class RunTests extends Configured
     job.setOutputValueClass(Text.class);
     
     // don't do anything with the results (yet) a summary would be nice
-    job.setReducerClass(IdentityReducer.class);
+    job.setNumReduceTasks(0);
     
     // submit the job
     log.info("Starting tests");

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Mon Jun 25 17:09:31 2012
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
 import org.apache.accumulo.core.client.Instance;
@@ -164,9 +163,6 @@ public class NullTserver {
     public void unloadTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
     
     @Override
-    public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {}
-    
-    @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveScan>();
     }
@@ -188,6 +184,16 @@ public class NullTserver {
     public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
       
     }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+     */
+    @Override
+    public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+    }
   }
   
   public static void main(String[] args) throws Exception {

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Mon Jun 25 17:09:31 2012
@@ -19,11 +19,7 @@ package org.apache.accumulo.server.test.
 import java.net.InetAddress;
 import java.util.Properties;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
@@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a
 import org.apache.accumulo.core.iterators.LongCombiner;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.server.test.randomwalk.State;
 import org.apache.accumulo.server.test.randomwalk.Test;
 import org.apache.hadoop.fs.FileSystem;
 
 public class Setup extends Test {
   
-  private static final int CORE_POOL_SIZE = 8;
-  private static final int MAX_POOL_SIZE = CORE_POOL_SIZE;
+  private static final int MAX_POOL_SIZE = 8;
   static String tableName = null;
   
   @Override
@@ -67,14 +61,7 @@ public class Setup extends Test {
     state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
     BulkPlusOne.counter.set(0l);
     
-    BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
-    ThreadFactory factory = new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Daemon(new LoggingRunnable(log, r));
-      }
-    };
-    ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory);
+    ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
     state.set("pool", e);
   }
   

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java Mon Jun 25 17:09:31 2012
@@ -59,7 +59,7 @@ public class CheckBalance extends Test {
     // Check for even # of tablets on each node
     boolean balanced = true;
     for (Entry<String,Long> entry : counts.entrySet()) {
-      if (Math.abs(entry.getValue().longValue() - average) > 1) {
+      if (Math.abs(entry.getValue().longValue() - average) > Math.max(1, average / 5)) {
         balanced = false;
         break;
       }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java Mon Jun 25 17:09:31 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab
 // If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually
 
 public class TraceFileSystem extends FileSystem {
+
   @Override
   public void setConf(Configuration conf) {
     Span span = Trace.start("setConf");
@@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil
     this.impl = impl;
   }
   
+  public FileSystem getImplementation() {
+    return impl;
+  }
+
   @Override
   public URI getUri() {
     Span span = Trace.start("getUri");

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Mon Jun 25 17:09:31 2012
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.conf.S
 
 public class AddressUtil {
   static public InetSocketAddress parseAddress(String address, Property portDefaultProperty) {
-    final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
+    final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(portDefaultProperty);
     return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
   }