You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2012/06/25 19:09:39 UTC
svn commit: r1353663 [5/6] - in /accumulo/branches/ACCUMULO-259: ./ bin/
conf/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/
conf/examples/512MB/native-standalone/ conf/examples/512MB/standalone/
core/ core/src/main/java/org/apache...
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Mon Jun 25 17:09:31 2012
@@ -56,10 +56,10 @@ import org.apache.accumulo.server.monito
import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
import org.apache.accumulo.server.monitor.servlets.JSONServlet;
import org.apache.accumulo.server.monitor.servlets.LogServlet;
-import org.apache.accumulo.server.monitor.servlets.LoggersServlet;
import org.apache.accumulo.server.monitor.servlets.MasterServlet;
import org.apache.accumulo.server.monitor.servlets.OperationServlet;
import org.apache.accumulo.server.monitor.servlets.ProblemServlet;
+import org.apache.accumulo.server.monitor.servlets.ShellServlet;
import org.apache.accumulo.server.monitor.servlets.TServersServlet;
import org.apache.accumulo.server.monitor.servlets.TablesServlet;
import org.apache.accumulo.server.monitor.servlets.VisServlet;
@@ -148,6 +148,8 @@ public class Monitor {
private static ServerConfiguration config;
+ private static EmbeddedWebServer server;
+
public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
Map<String,Double> compactingByTable = new HashMap<String,Double>();
if (mmi != null && mmi.tServerInfo != null) {
@@ -307,85 +309,84 @@ public class Monitor {
if (mmi == null)
UtilWaitThread.sleep(1000);
}
-
- int majorCompactions = 0;
- int minorCompactions = 0;
-
- lookupRateTracker.startingUpdates();
- indexCacheHitTracker.startingUpdates();
- indexCacheRequestTracker.startingUpdates();
- dataCacheHitTracker.startingUpdates();
- dataCacheRequestTracker.startingUpdates();
-
- for (TabletServerStatus server : mmi.tServerInfo) {
- TableInfo summary = Monitor.summarizeTableStats(server);
- totalIngestRate += summary.ingestRate;
- totalIngestByteRate += summary.ingestByteRate;
- totalQueryRate += summary.queryRate;
- totalScanRate += summary.scanRate;
- totalQueryByteRate += summary.queryByteRate;
- totalEntries += summary.recs;
- totalHoldTime += server.holdTime;
- totalLookups += server.lookups;
- majorCompactions += summary.major.running;
- minorCompactions += summary.minor.running;
- lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
- indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
- indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
- dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
- dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
- }
-
- lookupRateTracker.finishedUpdating();
- indexCacheHitTracker.finishedUpdating();
- indexCacheRequestTracker.finishedUpdating();
- dataCacheHitTracker.finishedUpdating();
- dataCacheRequestTracker.finishedUpdating();
-
- int totalTables = 0;
- for (TableInfo tInfo : mmi.tableMap.values()) {
- totalTabletCount += tInfo.tablets;
- onlineTabletCount += tInfo.onlineTablets;
- totalTables++;
- }
- Monitor.totalIngestRate = totalIngestRate;
- Monitor.totalTables = totalTables;
- totalIngestByteRate = totalIngestByteRate / 1000000.0;
- Monitor.totalIngestByteRate = totalIngestByteRate;
- Monitor.totalQueryRate = totalQueryRate;
- Monitor.totalScanRate = totalScanRate;
- totalQueryByteRate = totalQueryByteRate / 1000000.0;
- Monitor.totalQueryByteRate = totalQueryByteRate;
- Monitor.totalEntries = totalEntries;
- Monitor.totalTabletCount = totalTabletCount;
- Monitor.onlineTabletCount = onlineTabletCount;
- Monitor.totalHoldTime = totalHoldTime;
- Monitor.totalLookups = totalLookups;
-
- ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
- ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
- recoveriesOverTime.add(new Pair<Long,Integer>(currentTime, mmi.recovery.size()));
-
- double totalLoad = 0.;
- for (TabletServerStatus status : mmi.tServerInfo) {
- if (status != null)
- totalLoad += status.osLoad;
- }
- loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
-
- minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
- majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
-
- lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
-
- queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
- queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
-
- scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+ if (mmi != null) {
+ int majorCompactions = 0;
+ int minorCompactions = 0;
+
+ lookupRateTracker.startingUpdates();
+ indexCacheHitTracker.startingUpdates();
+ indexCacheRequestTracker.startingUpdates();
+ dataCacheHitTracker.startingUpdates();
+ dataCacheRequestTracker.startingUpdates();
- calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
- calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
-
+ for (TabletServerStatus server : mmi.tServerInfo) {
+ TableInfo summary = Monitor.summarizeTableStats(server);
+ totalIngestRate += summary.ingestRate;
+ totalIngestByteRate += summary.ingestByteRate;
+ totalQueryRate += summary.queryRate;
+ totalScanRate += summary.scanRate;
+ totalQueryByteRate += summary.queryByteRate;
+ totalEntries += summary.recs;
+ totalHoldTime += server.holdTime;
+ totalLookups += server.lookups;
+ majorCompactions += summary.major.running;
+ minorCompactions += summary.minor.running;
+ lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
+ indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
+ indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
+ dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
+ dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
+ }
+
+ lookupRateTracker.finishedUpdating();
+ indexCacheHitTracker.finishedUpdating();
+ indexCacheRequestTracker.finishedUpdating();
+ dataCacheHitTracker.finishedUpdating();
+ dataCacheRequestTracker.finishedUpdating();
+
+ int totalTables = 0;
+ for (TableInfo tInfo : mmi.tableMap.values()) {
+ totalTabletCount += tInfo.tablets;
+ onlineTabletCount += tInfo.onlineTablets;
+ totalTables++;
+ }
+ Monitor.totalIngestRate = totalIngestRate;
+ Monitor.totalTables = totalTables;
+ totalIngestByteRate = totalIngestByteRate / 1000000.0;
+ Monitor.totalIngestByteRate = totalIngestByteRate;
+ Monitor.totalQueryRate = totalQueryRate;
+ Monitor.totalScanRate = totalScanRate;
+ totalQueryByteRate = totalQueryByteRate / 1000000.0;
+ Monitor.totalQueryByteRate = totalQueryByteRate;
+ Monitor.totalEntries = totalEntries;
+ Monitor.totalTabletCount = totalTabletCount;
+ Monitor.onlineTabletCount = onlineTabletCount;
+ Monitor.totalHoldTime = totalHoldTime;
+ Monitor.totalLookups = totalLookups;
+
+ ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
+ ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
+
+ double totalLoad = 0.;
+ for (TabletServerStatus status : mmi.tServerInfo) {
+ if (status != null)
+ totalLoad += status.osLoad;
+ }
+ loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
+
+ minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
+ majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
+
+ lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
+
+ queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
+ queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
+
+ scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
+
+ calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
+ calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
+ }
try {
Monitor.problemSummary = ProblemReports.getInstance().summarize();
Monitor.problemException = null;
@@ -462,7 +463,6 @@ public class Monitor {
public void run(String hostname) {
Monitor.START_TIME = System.currentTimeMillis();
int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
- EmbeddedWebServer server;
try {
log.debug("Creating monitor on port " + port);
server = EmbeddedWebServer.create(port);
@@ -476,7 +476,6 @@ public class Monitor {
server.addServlet(MasterServlet.class, "/master");
server.addServlet(TablesServlet.class, "/tables");
server.addServlet(TServersServlet.class, "/tservers");
- server.addServlet(LoggersServlet.class, "/loggers");
server.addServlet(ProblemServlet.class, "/problems");
server.addServlet(GcStatusServlet.class, "/gc");
server.addServlet(LogServlet.class, "/log");
@@ -486,6 +485,8 @@ public class Monitor {
server.addServlet(Summary.class, "/trace/summary");
server.addServlet(ListType.class, "/trace/listType");
server.addServlet(ShowTrace.class, "/trace/show");
+ if (server.isUsingSsl())
+ server.addServlet(ShellServlet.class, "/shell");
LogService.startLogListener(Monitor.getSystemConfiguration());
server.start();
@@ -545,7 +546,7 @@ public class Monitor {
public static double getTotalScanRate() {
return totalScanRate;
}
-
+
public static double getTotalQueryByteRate() {
return totalQueryByteRate;
}
@@ -631,7 +632,7 @@ public class Monitor {
return new ArrayList<Pair<Long,Integer>>(scanRateOverTime);
}
}
-
+
public static List<Pair<Long,Double>> getQueryByteRateOverTime() {
synchronized (queryByteRateOverTime) {
return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime);
@@ -657,4 +658,8 @@ public class Monitor {
public static Instance getInstance() {
return instance;
}
+
+ public static boolean isUsingSsl() {
+ return server.isUsingSsl();
+ }
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Mon Jun 25 17:09:31 2012
@@ -73,7 +73,7 @@ abstract public class BasicServlet exten
}
}
- protected final void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doGet(req, resp);
}
@@ -136,7 +136,8 @@ abstract public class BasicServlet exten
// BEGIN HEADER
sb.append("<head>\n");
sb.append("<title>").append(getTitle(req)).append(" - Accumulo ").append(Constants.VERSION).append("</title>\n");
- if ((refresh > 0) && (req.getRequestURI().startsWith("/docs") == false) && (req.getRequestURI().startsWith("/vis") == false))
+ if ((refresh > 0) && (req.getRequestURI().startsWith("/docs") == false) && (req.getRequestURI().startsWith("/vis") == false)
+ && (req.getRequestURI().startsWith("/shell") == false))
sb.append("<meta http-equiv='refresh' content='" + refresh + "' />\n");
sb.append("<meta http-equiv='Content-Type' content='").append(DEFAULT_CONTENT_TYPE).append("' />\n");
sb.append("<meta http-equiv='Content-Script-Type' content='text/javascript' />\n");
@@ -175,7 +176,6 @@ abstract public class BasicServlet exten
sb.append("<hr />\n");
sb.append("<a href='/master'>Master Server</a><br />\n");
sb.append("<a href='/tservers'>Tablet Servers</a><br />\n");
- sb.append("<a href='/loggers'>Logger Servers</a><br />\n");
sb.append("<a href='/vis'>Server Activity</a><br />\n");
sb.append("<a href='/gc'>Garbage Collector</a><br />\n");
sb.append("<a href='/tables'>Tables</a><br />\n");
@@ -190,6 +190,8 @@ abstract public class BasicServlet exten
sb.append("<hr />\n");
sb.append("<a href='/xml'>XML</a><br />\n");
sb.append("<a href='/json'>JSON</a><hr />\n");
+ if (Monitor.isUsingSsl())
+ sb.append("<a href='/shell'>Shell</a><hr />\n");
sb.append("<div class='smalltext'>[<a href='").append("/op?action=refresh&value=").append(refresh < 1 ? "5" : "-1");
sb.append("&redir=").append(currentPage(req)).append("'>");
sb.append(refresh < 1 ? "en" : "dis").append("able auto-refresh</a>]</div>\n");
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java Mon Jun 25 17:09:31 2012
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.master.thrift.MasterState;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.StringUtil;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.monitor.DedupedLogEvent;
@@ -43,9 +42,9 @@ import org.apache.accumulo.server.monito
import org.apache.accumulo.server.monitor.util.Table;
import org.apache.accumulo.server.monitor.util.TableRow;
import org.apache.accumulo.server.monitor.util.celltypes.DurationType;
-import org.apache.accumulo.server.monitor.util.celltypes.LoggerLinkType;
import org.apache.accumulo.server.monitor.util.celltypes.NumberType;
import org.apache.accumulo.server.monitor.util.celltypes.ProgressChartType;
+import org.apache.accumulo.server.util.AddressUtil;
import org.apache.log4j.Level;
public class MasterServlet extends BasicServlet {
@@ -57,7 +56,7 @@ public class MasterServlet extends Basic
@Override
protected String getTitle(HttpServletRequest req) {
List<String> masters = Monitor.getInstance().getMasterLocations();
- return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+ return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
}
@Override
@@ -137,8 +136,6 @@ public class MasterServlet extends Basic
masterStatus.addSortableColumn("# Online<br />Tablet Servers", new NumberType<Integer>((int) (slaves.size() * 0.8 + 1.0), slaves.size(),
(int) (slaves.size() * 0.6 + 1.0), slaves.size()), "Number of tablet servers currently available");
masterStatus.addSortableColumn("# Total<br />Tablet Servers", new NumberType<Integer>(), "The total number of tablet servers configured");
- masterStatus.addSortableColumn("Loggers", new NumberType<Integer>((int) (slaves.size() * .8), Integer.MAX_VALUE, 1, Integer.MAX_VALUE),
- "The number of write-ahead loggers. This should be approximately the same as the number of tablet servers (and greater than zero).");
masterStatus.addSortableColumn("Last GC", null, "The last time files were cleaned-up from HDFS.");
masterStatus.addSortableColumn("# Tablets", new NumberType<Integer>(0, Integer.MAX_VALUE, 2, Integer.MAX_VALUE), null);
masterStatus.addSortableColumn("# Unassigned<br />Tablets", new NumberType<Integer>(0, 0), null);
@@ -153,10 +150,9 @@ public class MasterServlet extends Basic
masterStatus.addSortableColumn("OS Load", new NumberType<Double>(0., guessHighLoad * 1., 0., guessHighLoad * 3.),
"The one-minute load average on the computer that runs the monitor web server.");
TableRow row = masterStatus.prepareRow();
- row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+ row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
row.add(Monitor.getMmi().tServerInfo.size());
row.add(slaves.size());
- row.add(Monitor.getMmi().loggers.size());
row.add("<a href='/gc'>" + gcStatus + "</a>");
row.add(Monitor.getTotalTabletCount());
row.add(Monitor.getMmi().unassignedTablets);
@@ -177,25 +173,28 @@ public class MasterServlet extends Basic
private void doRecoveryList(HttpServletRequest req, StringBuilder sb) {
MasterMonitorInfo mmi = Monitor.getMmi();
if (mmi != null) {
- List<RecoveryStatus> jobs = mmi.recovery;
- if (jobs != null && jobs.size() > 0) {
- Table recoveryTable = new Table("logRecovery", "Log Recovery");
- recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
- recoveryTable.addSortableColumn("Server", new LoggerLinkType(), null);
- recoveryTable.addSortableColumn("Log");
- recoveryTable.addSortableColumn("Time", new DurationType(), null);
- recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
-
- for (RecoveryStatus recovery : jobs) {
- TableRow row = recoveryTable.prepareRow();
- row.add(recovery);
- row.add(recovery.name);
- row.add((long) recovery.runtime);
- row.add(recovery.copyProgress);
- recoveryTable.addRow(row);
+ Table recoveryTable = new Table("logRecovery", "Log Recovery");
+ recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
+ recoveryTable.addSortableColumn("Server");
+ recoveryTable.addSortableColumn("Log");
+ recoveryTable.addSortableColumn("Time", new DurationType(), null);
+ recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
+ int rows = 0;
+ for (TabletServerStatus server : mmi.tServerInfo) {
+ if (server.logSorts != null) {
+ for (RecoveryStatus recovery : server.logSorts) {
+ TableRow row = recoveryTable.prepareRow();
+ row.add(AddressUtil.parseAddress(server.name, Property.TSERV_CLIENTPORT).getHostName());
+ row.add(recovery.name);
+ row.add((long) recovery.runtime);
+ row.add(recovery.progress);
+ recoveryTable.addRow(row);
+ rows++;
+ }
}
- recoveryTable.generate(req, sb);
}
+ if (rows > 0)
+ recoveryTable.generate(req, sb);
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java Mon Jun 25 17:09:31 2012
@@ -167,8 +167,6 @@ public class OperationServlet extends Ba
// a dead server should have a uniq address: a logger or tserver
DeadServerList obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADTSERVERS);
obit.delete(server);
- obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADLOGGERS);
- obit.delete(server);
}
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java Mon Jun 25 17:09:31 2012
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.I
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.DeadServer;
-import org.apache.accumulo.core.master.thrift.LoggerStatus;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -80,12 +79,6 @@ public class XMLServlet extends BasicSer
sb.append("</compactions>\n");
sb.append("<tablets>").append(summary.tablets).append("</tablets>\n");
- if (status.loggers != null) {
- sb.append("<loggers>");
- for (String logger : status.loggers)
- sb.append("<logger>" + logger + "</logger>");
- sb.append("</loggers>");
- }
sb.append("<ingest>").append(summary.ingestRate).append("</ingest>\n");
sb.append("<query>").append(summary.queryRate).append("</query>\n");
@@ -110,12 +103,6 @@ public class XMLServlet extends BasicSer
}
sb.append("\n</badTabletServers>\n");
- sb.append("\n<loggers>\n");
- for (LoggerStatus entry : Monitor.getMmi().loggers) {
- sb.append(String.format("<logger id='%s'/>\n", entry.logger));
- }
- sb.append("\n</loggers>\n");
-
sb.append("\n<tabletServersShuttingDown>\n");
for (String server : Monitor.getMmi().serversShuttingDown) {
sb.append(String.format("<server id='%s'/>\n", server));
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Mon Jun 25 17:09:31 2012
@@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.util.NamingThreadFactory;
import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.commons.collections.map.LRUMap;
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Mon Jun 25 17:09:31 2012
@@ -79,7 +79,7 @@ public class Compactor implements Callab
private CompactionEnv env;
private Configuration conf;
private FileSystem fs;
- private KeyExtent extent;
+ protected KeyExtent extent;
private List<IteratorSetting> iterators;
Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Mon Jun 25 17:09:31 2012
@@ -22,10 +22,13 @@ import java.util.Map;
import java.util.Random;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReports;
@@ -64,6 +67,15 @@ public class MinorCompactor extends Comp
});
}
+ private boolean isTableDeleting() {
+ try {
+ return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
+ } catch (Exception e) {
+ log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
+ return false; // can not get positive confirmation that its deleting.
+ }
+ }
+
@Override
public CompactionStats call() {
log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
@@ -75,7 +87,6 @@ public class MinorCompactor extends Comp
boolean reportedProblem = false;
do {
-
try {
CompactionStats ret = super.call();
@@ -117,6 +128,9 @@ public class MinorCompactor extends Comp
log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
}
+ if (isTableDeleting())
+ return new CompactionStats(0, 0);
+
} while (true);
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon Jun 25 17:09:31 2012
@@ -111,8 +111,8 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv;
import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.server.trace.TraceFileSystem;
@@ -209,7 +209,7 @@ public class Tablet {
return Tablet.this;
}
- public boolean beginUpdatingLogsUsed(ArrayList<RemoteLogger> copy, boolean mincFinish) {
+ public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
}
@@ -1178,7 +1178,6 @@ public class Tablet {
for (int i = 0; i < files.length; i++) {
paths[i] = files[i].getPath();
}
- log.debug("fs " + fs + " files: " + Arrays.toString(paths) + " location: " + location);
Collection<String> goodPaths = cleanUpFiles(fs, files, location, true);
for (String path : goodPaths) {
String filename = new Path(path).getName();
@@ -1226,17 +1225,6 @@ public class Tablet {
return datafiles;
}
- private static Set<RemoteLogger> getCurrentLoggers(List<LogEntry> entries) {
- Set<RemoteLogger> result = new HashSet<RemoteLogger>();
- for (LogEntry logEntry : entries) {
- for (String log : logEntry.logSet) {
- String[] parts = log.split("/", 2);
- result.add(new RemoteLogger(parts[0], parts[1], null));
- }
- }
- return result;
- }
-
private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
List<LogEntry> logEntries = new ArrayList<LogEntry>();
@@ -1470,7 +1458,15 @@ public class Tablet {
throw new RuntimeException(t);
}
}
- currentLogs = getCurrentLoggers(logEntries);
+ // make some closed references that represent the recovered logs
+ currentLogs = new HashSet<DfsLogger>();
+ for (LogEntry logEntry : logEntries) {
+ for (String log : logEntry.logSet) {
+ String[] parts = log.split("/", 2);
+ currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
+ }
+ }
+
log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
+ " entries created)");
}
@@ -2241,7 +2237,7 @@ public class Tablet {
private synchronized MinorCompactionTask prepareForMinC(long flushId) {
CommitSession oldCommitSession = tabletMemory.prepareForMinC();
otherLogs = currentLogs;
- currentLogs = new HashSet<RemoteLogger>();
+ currentLogs = new HashSet<DfsLogger>();
String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
@@ -3243,12 +3239,8 @@ public class Tablet {
log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet())) + " --> "
+ datafileManager.abs2rel(new Path(compactTmpName)));
- Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, // always
- // propagate
- // deletes,
- // unless
- // last
- // batch
+ // always propagate deletes, unless last batch
+ Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true,
acuTableConf, extent, cenv, compactionIterators);
CompactionStats mcs = compactor.call();
@@ -3650,8 +3642,18 @@ public class Tablet {
}
}
- private Set<RemoteLogger> currentLogs = new HashSet<RemoteLogger>();
+ private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
+ public Set<String> getCurrentLogs() {
+ Set<String> result = new HashSet<String>();
+ synchronized (currentLogs) {
+ for (DfsLogger log : currentLogs) {
+ result.add(log.toString());
+ }
+ }
+ return result;
+ }
+
private Set<String> beginClearingUnusedLogs() {
Set<String> doomed = new HashSet<String>();
@@ -3665,12 +3667,12 @@ public class Tablet {
if (removingLogs)
throw new IllegalStateException("Attempted to clear logs when removal of logs in progress");
- for (RemoteLogger logger : otherLogs) {
+ for (DfsLogger logger : otherLogs) {
otherLogsCopy.add(logger.toString());
doomed.add(logger.toString());
}
- for (RemoteLogger logger : currentLogs) {
+ for (DfsLogger logger : currentLogs) {
currentLogsCopy.add(logger.toString());
doomed.remove(logger.toString());
}
@@ -3698,7 +3700,7 @@ public class Tablet {
logLock.unlock();
}
- private Set<RemoteLogger> otherLogs = Collections.emptySet();
+ private Set<DfsLogger> otherLogs = Collections.emptySet();
private boolean removingLogs = false;
// this lock is basically used to synchronize writing of log info to !METADATA
@@ -3708,7 +3710,7 @@ public class Tablet {
return currentLogs.size();
}
- private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<RemoteLogger> more, boolean mincFinish) {
+ private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
boolean releaseLock = true;
@@ -3745,7 +3747,7 @@ public class Tablet {
int numAdded = 0;
int numContained = 0;
- for (RemoteLogger logger : more) {
+ for (DfsLogger logger : more) {
if (addToOther) {
if (otherLogs.add(logger))
numAdded++;
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 25 17:09:31 2012
@@ -18,11 +18,12 @@ package org.apache.accumulo.server.table
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+import java.io.EOFException;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
@@ -45,6 +46,7 @@ import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
@@ -97,7 +99,6 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.data.thrift.UpdateErrors;
import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.master.MasterNotRunningException;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -134,6 +135,8 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -157,10 +160,9 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
-import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
@@ -187,9 +189,12 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.Platform;
-import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -209,22 +214,22 @@ public class TabletServer extends Abstra
private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
private static long lastMemorySize = 0;
private static long gcTimeIncreasedCount;
- private static final Class<? extends LoggerStrategy> DEFAULT_LOGGER_STRATEGY = RoundRobinLoggerStrategy.class;
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private TabletServerLogger logger;
- private LoggerStrategy loggerStrategy;
protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
private ServerConfiguration serverConfig;
+ private LogSorter logSorter = null;
public TabletServer(ServerConfiguration conf, FileSystem fs) {
super();
this.serverConfig = conf;
this.instance = conf.getInstance();
this.fs = TraceFileSystem.wrap(fs);
+ this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
SimpleTimer.getInstance().schedule(new TimerTask() {
@Override
public void run() {
@@ -891,7 +896,7 @@ public class TabletServer extends Abstra
ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
runState.set(ScanRunState.RUNNING);
Thread.currentThread().setName(
@@ -956,8 +961,9 @@ public class TabletServer extends Abstra
Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
if (isCancelled() || session == null)
return;
-
- long maxResultsSize = acuConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+ TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+ long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
long bytesAdded = 0;
long maxScanTime = 4000;
@@ -1819,7 +1825,7 @@ public class TabletServer extends Abstra
final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
// Root tablet assignment must take place immediately
if (extent.isRootTablet()) {
- new Thread("Root Tablet Assignment") {
+ new Daemon("Root Tablet Assignment") {
public void run() {
ah.run();
if (onlineTablets.containsKey(extent)) {
@@ -1942,12 +1948,6 @@ public class TabletServer extends Abstra
return statsKeeper.getTabletStats();
}
- @Override
- public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {
- loggerStrategy.preferLoggers(loggers);
- }
-
- @Override
public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
try {
checkPermission(credentials, null, true, "getScans");
@@ -2011,6 +2011,60 @@ public class TabletServer extends Abstra
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+ String myname = getClientAddressString();
+ myname = myname.replace(':', '+');
+ Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+ Set<String> loggers = new HashSet<String>();
+ logger.getLoggers(loggers);
+ nextFile:
+ for (String filename : filenames) {
+ for (String logger : loggers) {
+ if (logger.contains(filename))
+ continue nextFile;
+ }
+ List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ onlineTabletsCopy.addAll(onlineTablets.values());
+ }
+ for (Tablet tablet : onlineTabletsCopy) {
+ for (String current : tablet.getCurrentLogs()) {
+ if (current.contains(filename)) {
+ log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+ continue nextFile;
+ }
+ }
+ }
+ try {
+ String source = logDir + "/" + filename;
+ if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+ String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+ fs.mkdirs(new Path(walogArchive));
+ String dest = walogArchive + "/" + filename;
+ log.info("Archiving walog " + source + " to " + dest);
+ if (!fs.rename(new Path(source), new Path(dest)))
+ log.error("rename is unsuccessful");
+ } else {
+ log.info("Deleting walog " + filename);
+ if (!fs.delete(new Path(source), true))
+ log.warn("Failed to delete walog " + source);
+ if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+ log.info("Deleted any recovery log " + filename);
+
+ }
+ } catch (IOException e) {
+ log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+ }
+ }
+ }
+
}
private class SplitRunner implements Runnable {
@@ -2445,7 +2499,7 @@ public class TabletServer extends Abstra
AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1);
if (extent.isMeta()) {
if (extent.isRootTablet()) {
- new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+ new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
} else {
resourceManager.addMetaDataAssignment(handler);
}
@@ -2463,7 +2517,6 @@ public class TabletServer extends Abstra
private FileSystem fs;
private Instance instance;
- private ZooCache cache;
private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@@ -2496,48 +2549,23 @@ public class TabletServer extends Abstra
return statsKeeper;
}
- public Set<String> getLoggers() throws TException, MasterNotRunningException, ThriftSecurityException {
- Set<String> allLoggers = new HashSet<String>();
- String dir = ZooUtil.getRoot(instance) + Constants.ZLOGGERS;
- for (String child : cache.getChildren(dir)) {
- allLoggers.add(new String(cache.get(dir + "/" + child)));
- }
- if (allLoggers.isEmpty()) {
- log.warn("there are no loggers registered in zookeeper");
- return allLoggers;
- }
- Set<String> result = loggerStrategy.getLoggers(Collections.unmodifiableSet(allLoggers));
- Set<String> bogus = new HashSet<String>(result);
- bogus.removeAll(allLoggers);
- if (!bogus.isEmpty())
- log.warn("logger strategy is returning loggers that are not candidates");
- result.removeAll(bogus);
- if (result.isEmpty())
- log.warn("strategy returned no useful loggers");
- return result;
- }
-
- public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
+ public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
- List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
long now = RelativeTime.currentTimeMillis();
List<String> logSet = new ArrayList<String>();
- for (RemoteLogger log : logs)
+ for (DfsLogger log : logs)
logSet.add(log.toString());
- for (RemoteLogger log : logs) {
- MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
- entry.extent = extent;
- entry.tabletId = id;
- entry.timestamp = now;
- entry.server = log.getLogger();
- entry.filename = log.getFileName();
- entry.logSet = logSet;
- entries.add(entry);
- }
- MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
+ MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
+ entry.extent = extent;
+ entry.tabletId = id;
+ entry.timestamp = now;
+ entry.server = logs.get(0).getLogger();
+ entry.filename = logs.get(0).getFileName();
+ entry.logSet = logSet;
+ MetadataTable.addLogEntry(SecurityConstants.getSystemCredentials(), entry, getLock());
}
-
+
private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK);
@@ -2650,6 +2678,12 @@ public class TabletServer extends Abstra
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
+ try {
+ logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+ } catch (Exception ex) {
+ log.error("Error setting watches for recoveries");
+ throw new RuntimeException(ex);
+ }
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
@@ -2960,31 +2994,6 @@ public class TabletServer extends Abstra
majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor()));
majorCompactorThread.setName("Split/MajC initiator");
majorCompactorThread.start();
-
- String className = getSystemConfiguration().get(Property.TSERV_LOGGER_STRATEGY);
- Class<? extends LoggerStrategy> klass = DEFAULT_LOGGER_STRATEGY;
- try {
- klass = AccumuloClassLoader.loadClass(className, LoggerStrategy.class);
- } catch (Exception ex) {
- log.warn("Unable to load class " + className + " for logger strategy, using " + klass.getName(), ex);
- }
- try {
- Constructor<? extends LoggerStrategy> constructor = klass.getConstructor(TabletServer.class);
- loggerStrategy = constructor.newInstance(this);
- loggerStrategy.init(serverConfig);
- } catch (Exception ex) {
- log.warn("Unable to create object of type " + klass.getName() + " using " + DEFAULT_LOGGER_STRATEGY.getName());
- }
- if (loggerStrategy == null) {
- try {
- loggerStrategy = DEFAULT_LOGGER_STRATEGY.getConstructor(TabletServer.class).newInstance(this);
- } catch (Exception ex) {
- log.fatal("Programmer error: cannot create a logger strategy.");
- throw new RuntimeException(ex);
- }
- }
- cache = new ZooCache();
-
}
public TabletServerStatus getStats(Map<String,MapCounter<ScanRunState>> scanCounts) {
@@ -3065,12 +3074,11 @@ public class TabletServer extends Abstra
result.name = getClientAddressString();
result.holdTime = resourceManager.holdTime();
result.lookups = seekCount.get();
- result.loggers = new HashSet<String>();
result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount();
result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount();
result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
- logger.getLoggers(result.loggers);
+ result.logSorts = logSorter.getLogSorts();
return result;
}
@@ -3082,6 +3090,7 @@ public class TabletServer extends Abstra
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration conf = new ServerConfiguration(instance);
Accumulo.init(fs, conf, "tserver");
+ recoverLocalWriteAheadLogs(fs, conf);
TabletServer server = new TabletServer(conf, fs);
server.config(hostname);
Accumulo.enableTracing(hostname, "tserver");
@@ -3090,7 +3099,61 @@ public class TabletServer extends Abstra
log.error("Uncaught exception in TabletServer.main, exiting", ex);
}
}
-
+
+ /**
+ * Copy local walogs into HDFS on an upgrade
+ *
+ */
+ public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
+ FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
+ AccumuloConfiguration conf = serverConf.getConfiguration();
+ String localWalDirectories = conf.get(Property.LOGGER_DIR);
+ for (String localWalDirectory : localWalDirectories.split(",")) {
+ if (!localWalDirectory.startsWith("/")) {
+ localWalDirectory = System.getenv("ACCUMULO_HOME") + "/" + localWalDirectory;
+ }
+
+ FileStatus status = null;
+ try {
+ status = localfs.getFileStatus(new Path(localWalDirectory));
+ } catch (FileNotFoundException fne) {}
+
+ if (status == null || !status.isDir()) {
+ log.debug("Local walog dir " + localWalDirectory + " not found ");
+ continue;
+ }
+
+ for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
+ String name = file.getPath().getName();
+ try {
+ UUID.fromString(name);
+ } catch (IllegalArgumentException ex) {
+ log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
+ continue;
+ }
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+ log.info("Openning local log " + file.getPath());
+ Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
+ Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
+ FSDataOutputStream writer = fs.create(tmp);
+ while (reader.next(key, value)) {
+ try {
+ key.write(writer);
+ value.write(writer);
+ } catch (EOFException ex) {
+ break;
+ }
+ }
+ writer.close();
+ reader.close();
+ fs.rename(tmp, new Path(tmp.getParent(), name));
+ log.info("Copied local log " + name);
+ localfs.delete(new Path(localWalDirectory, name), true);
+ }
+ }
+ }
+
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions++;
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3113,7 +3176,7 @@ public class TabletServer extends Abstra
String recovery = null;
for (String log : entry.logSet) {
String[] parts = log.split("/"); // "host:port/filename"
- log = ServerConstants.getRecoveryDir() + "/" + parts[1] + ".recovered";
+ log = ServerConstants.getRecoveryDir() + "/" + parts[1];
Path finished = new Path(log + "/finished");
TabletServer.log.info("Looking for " + finished);
if (fs.exists(finished)) {
@@ -3306,11 +3369,27 @@ public class TabletServer extends Abstra
return METRICS_PREFIX;
}
- // public AccumuloConfiguration getTableConfiguration(String tableId) {
- // return ServerConfiguration.getTableConfiguration(instance, tableId);
- // }
-
public TableConfiguration getTableConfiguration(KeyExtent extent) {
return ServerConfiguration.getTableConfiguration(instance, extent.getTableId().toString());
}
+ public DfsLogger.ServerResources getServerConfig() {
+ return new DfsLogger.ServerResources() {
+
+ @Override
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ @Override
+ public Set<TServerInstance> getCurrentTServers() {
+ return null;
+ }
+
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ return getSystemConfiguration();
+ }
+ };
+ }
+
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Mon Jun 25 17:09:31 2012
@@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.util.NamingThreadFactory;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
import org.apache.hadoop.fs.FileSystem;
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Jun 25 17:09:31 2012
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -35,13 +34,12 @@ import org.apache.accumulo.core.conf.Pro
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.tabletserver.Tablet;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
import org.apache.accumulo.server.tabletserver.TabletServer;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
import org.apache.log4j.Logger;
/**
@@ -61,7 +59,7 @@ public class TabletServerLogger {
private final TabletServer tserver;
// The current log set: always updated to a new set with every change of loggers
- private final List<RemoteLogger> loggers = new ArrayList<RemoteLogger>();
+ private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
// The current generation of logSet.
// Because multiple threads can be using a log set at one time, a log
@@ -134,7 +132,7 @@ public class TabletServerLogger {
this.maxSize = maxSize;
}
- private int initializeLoggers(final List<RemoteLogger> copy) throws IOException {
+ private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
final int[] result = {-1};
testLockAndRun(logSetLock, new TestCallWithWriteLock() {
boolean test() {
@@ -165,8 +163,8 @@ public class TabletServerLogger {
public void getLoggers(Set<String> loggersOut) {
logSetLock.readLock().lock();
try {
- for (RemoteLogger logger : loggers) {
- loggersOut.add(logger.getLogger());
+ for (DfsLogger logger : loggers) {
+ loggersOut.add(logger.toString());
}
} finally {
logSetLock.readLock().unlock();
@@ -183,35 +181,13 @@ public class TabletServerLogger {
}
try {
- while (true) {
- Set<String> loggerAddresses = tserver.getLoggers();
- if (!loggerAddresses.isEmpty()) {
- for (String logger : loggerAddresses) {
- try {
- loggers.add(new RemoteLogger(logger, UUID.randomUUID(), tserver.getSystemConfiguration()));
- } catch (LoggerClosedException t) {
- close();
- break;
- } catch (Exception t) {
- close();
- log.warn("Unable to connect to " + logger + ": " + t);
- break;
- }
- }
-
- if (loggers.size() == loggerAddresses.size())
- break;
- if (loggers.size() > 0) {
- // something is screwy, loggers.size() should be 0 or loggerAddresses.size()..
- throw new RuntimeException("Unexpected number of loggers " + loggers.size() + " " + loggerAddresses.size());
- }
- }
- UtilWaitThread.sleep(1000);
- }
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+ alog.open(tserver.getClientAddressString());
+ loggers.add(alog);
logSetId.incrementAndGet();
return;
} catch (Exception t) {
- throw new IOException(t);
+ throw new RuntimeException(t);
}
}
@@ -229,13 +205,13 @@ public class TabletServerLogger {
throw new IllegalStateException("close should be called with write lock held!");
}
try {
- for (RemoteLogger logger : loggers) {
+ for (DfsLogger logger : loggers) {
try {
logger.close();
- } catch (LoggerClosedException ex) {
- // expected
+ } catch (DfsLogger.LogClosedException ex) {
+ // ignore
} catch (Throwable ex) {
- log.error("Unable to cleanly close logger " + logger.getLogger() + ": " + ex);
+ log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
}
}
loggers.clear();
@@ -246,7 +222,7 @@ public class TabletServerLogger {
}
interface Writer {
- LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
+ LoggerOperation write(DfsLogger logger, int seq) throws Exception;
}
private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -265,7 +241,7 @@ public class TabletServerLogger {
while (!success) {
try {
// get a reference to the loggers that no other thread can touch
- ArrayList<RemoteLogger> copy = new ArrayList<RemoteLogger>();
+ ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
currentLogSet = initializeLoggers(copy);
// add the logger to the log set for the memory in the tablet,
@@ -294,7 +270,7 @@ public class TabletServerLogger {
if (seq < 0)
throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
- for (RemoteLogger wal : copy) {
+ for (DfsLogger wal : copy) {
LoggerOperation lop = writer.write(wal, seq);
if (lop != null)
queuedOperations.add(lop);
@@ -307,13 +283,11 @@ public class TabletServerLogger {
// double-check: did the log set change?
success = (currentLogSet == logSetId.get());
}
+ } catch (DfsLogger.LogClosedException ex) {
+ log.debug("Logs closed while writing, retrying " + (attempt + 1));
} catch (Exception t) {
- if (attempt == 0) {
- log.info("Log write failed: another thread probably closed the log");
- } else {
- log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
- UtilWaitThread.sleep(100);
- }
+ log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+ UtilWaitThread.sleep(100);
} finally {
attempt++;
}
@@ -356,7 +330,7 @@ public class TabletServerLogger {
return -1;
return write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
return null;
}
@@ -368,7 +342,7 @@ public class TabletServerLogger {
return -1;
int seq = write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
return logger.log(tabletSeq, commitSession.getLogId(), m);
}
});
@@ -388,7 +362,7 @@ public class TabletServerLogger {
int seq = write(loggables.keySet(), false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
@@ -419,7 +393,7 @@ public class TabletServerLogger {
int seq = write(commitSession, true, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
return null;
}
@@ -435,7 +409,7 @@ public class TabletServerLogger {
return -1;
write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
return null;
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Mon Jun 25 17:09:31 2012
@@ -90,9 +90,6 @@ public class GetMasterStats {
out(2, "Time Difference %.1f", ((now - server.lastContact) / 1000.));
out(2, "Total Records %d", summary.recs);
out(2, "Lookups %d", server.lookups);
- out(2, "Loggers %d", server.loggers.size());
- for (String logger : server.loggers)
- out(3, "Logger %s", logger);
if (server.holdTime > 0)
out(2, "Hold Time %d", server.holdTime);
if (server.tableMap != null && server.tableMap.size() > 0) {
@@ -111,16 +108,12 @@ public class GetMasterStats {
out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : info.minor.queued);
}
}
- }
- }
- if (stats.recovery != null && stats.recovery.size() > 0) {
- out(0, "Recovery");
- for (RecoveryStatus r : stats.recovery) {
- out(1, "Log Server %s", r.host);
- out(1, "Log Name %s", r.name);
- out(1, "Map Progress: %.2f%%", r.mapProgress * 100);
- out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100);
- out(1, "Time running: %s", r.runtime / 1000.);
+ out(2, "Recoveries %d", server.logSorts.size());
+ for (RecoveryStatus sort : server.logSorts) {
+ out(3, "File %s", sort.name);
+ out(3, "Progress %.2f%%", sort.progress * 100);
+ out(3, "Time running %s", sort.runtime / 1000.);
+ }
}
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java Mon Jun 25 17:09:31 2012
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.B
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -67,11 +68,11 @@ public class ContinuousIngest {
args = processOptions(args);
- if (args.length != 13) {
+ if (args.length != 14) {
throw new IllegalArgumentException(
"usage : "
+ ContinuousIngest.class.getName()
- + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
+ + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <num> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
}
if (debugLog != null) {
@@ -89,16 +90,17 @@ public class ContinuousIngest {
String table = args[4];
- long min = Long.parseLong(args[5]);
- long max = Long.parseLong(args[6]);
- short maxColF = Short.parseShort(args[7]);
- short maxColQ = Short.parseShort(args[8]);
-
- long maxMemory = Long.parseLong(args[9]);
- long maxLatency = Integer.parseInt(args[10]);
- int maxWriteThreads = Integer.parseInt(args[11]);
+ long num = Long.parseLong(args[5]);
+ long min = Long.parseLong(args[6]);
+ long max = Long.parseLong(args[7]);
+ short maxColF = Short.parseShort(args[8]);
+ short maxColQ = Short.parseShort(args[9]);
+
+ long maxMemory = Long.parseLong(args[10]);
+ long maxLatency = Integer.parseInt(args[11]);
+ int maxWriteThreads = Integer.parseInt(args[12]);
- boolean checksum = Boolean.parseBoolean(args[12]);
+ boolean checksum = Boolean.parseBoolean(args[13]);
if (min < 0 || max < 0 || max <= min) {
throw new IllegalArgumentException("bad min and max");
@@ -109,6 +111,11 @@ public class ContinuousIngest {
String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
Tracer.getInstance().addReceiver(new ZooSpanClient(zooKeepers, path, localhost, "cingest", 1000));
+ if (!conn.tableOperations().exists(table))
+ try {
+ conn.tableOperations().create(table);
+ } catch (TableExistsException tee) {}
+
BatchWriter bw = conn.createBatchWriter(table, maxMemory, maxLatency, maxWriteThreads);
bw = Trace.wrapAll(bw, new CountSampler(1024));
@@ -133,7 +140,7 @@ public class ContinuousIngest {
long lastFlushTime = System.currentTimeMillis();
- while (true) {
+ out: while (true) {
// generate first set of nodes
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(min, max, r);
@@ -152,6 +159,8 @@ public class ContinuousIngest {
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= num)
+ break out;
// generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
@@ -165,6 +174,8 @@ public class ContinuousIngest {
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= num)
+ break out;
}
// create one big linked list, this makes all of the first inserts
@@ -175,7 +186,11 @@ public class ContinuousIngest {
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= num)
+ break out;
}
+
+ bw.close();
}
private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java Mon Jun 25 17:09:31 2012
@@ -95,6 +95,26 @@ public class BadIteratorMincTest extends
if (count != 1)
throw new Exception("Did not see expected # entries " + count);
+
+ // now try putting bad iterator back and deleting the table
+ getConnector().tableOperations().setProperty("foo", Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.badi", "30," + BadIterator.class.getName());
+ bw = getConnector().createBatchWriter("foo", 1000000, 60000l, 2);
+ m = new Mutation(new Text("r2"));
+ m.put(new Text("acf"), new Text("foo"), new Value("1".getBytes()));
+ bw.addMutation(m);
+ bw.close();
+
+ // make sure property is given time to propagate
+ UtilWaitThread.sleep(1000);
+
+ getConnector().tableOperations().flush("foo", null, null, false);
+
+ // make sure the flush has time to start
+ UtilWaitThread.sleep(1000);
+
+ // this should not hang
+ getConnector().tableOperations().delete("foo");
+
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java Mon Jun 25 17:09:31 2012
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
-import org.apache.accumulo.server.logger.IdentityReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -135,7 +134,7 @@ public class RunTests extends Configured
job.setOutputValueClass(Text.class);
// don't do anything with the results (yet) a summary would be nice
- job.setReducerClass(IdentityReducer.class);
+ job.setNumReduceTasks(0);
// submit the job
log.info("Starting tests");
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Mon Jun 25 17:09:31 2012
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.accumulo.cloudtrace.thrift.TInfo;
import org.apache.accumulo.core.client.Instance;
@@ -164,9 +163,6 @@ public class NullTserver {
public void unloadTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
@Override
- public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {}
-
- @Override
public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveScan>();
}
@@ -188,6 +184,16 @@ public class NullTserver {
public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
}
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+ }
}
public static void main(String[] args) throws Exception {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Mon Jun 25 17:09:31 2012
@@ -19,11 +19,7 @@ package org.apache.accumulo.server.test.
import java.net.InetAddress;
import java.util.Properties;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
@@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a
import org.apache.accumulo.core.iterators.LongCombiner;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.server.test.randomwalk.State;
import org.apache.accumulo.server.test.randomwalk.Test;
import org.apache.hadoop.fs.FileSystem;
public class Setup extends Test {
- private static final int CORE_POOL_SIZE = 8;
- private static final int MAX_POOL_SIZE = CORE_POOL_SIZE;
+ private static final int MAX_POOL_SIZE = 8;
static String tableName = null;
@Override
@@ -67,14 +61,7 @@ public class Setup extends Test {
state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
BulkPlusOne.counter.set(0l);
- BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
- ThreadFactory factory = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Daemon(new LoggingRunnable(log, r));
- }
- };
- ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory);
+ ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
state.set("pool", e);
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/CheckBalance.java Mon Jun 25 17:09:31 2012
@@ -59,7 +59,7 @@ public class CheckBalance extends Test {
// Check for even # of tablets on each node
boolean balanced = true;
for (Entry<String,Long> entry : counts.entrySet()) {
- if (Math.abs(entry.getValue().longValue() - average) > 1) {
+ if (Math.abs(entry.getValue().longValue() - average) > Math.max(1, average / 5)) {
balanced = false;
break;
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java Mon Jun 25 17:09:31 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab
// If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually
public class TraceFileSystem extends FileSystem {
+
@Override
public void setConf(Configuration conf) {
Span span = Trace.start("setConf");
@@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil
this.impl = impl;
}
+ public FileSystem getImplementation() {
+ return impl;
+ }
+
@Override
public URI getUri() {
Span span = Trace.start("getUri");
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Mon Jun 25 17:09:31 2012
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.conf.S
public class AddressUtil {
static public InetSocketAddress parseAddress(String address, Property portDefaultProperty) {
- final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
+ final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(portDefaultProperty);
return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
}