You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/06/05 15:18:25 UTC
svn commit: r1346380 [5/5] - in /accumulo/trunk: ./ bin/
core/src/main/java/org/apache/accumulo/core/
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accum...
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Tue Jun 5 13:18:22 2012
@@ -18,11 +18,11 @@ package org.apache.accumulo.server.table
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
@@ -45,6 +45,7 @@ import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
@@ -97,7 +98,6 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.data.thrift.UpdateErrors;
import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.master.MasterNotRunningException;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -136,6 +136,8 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -159,10 +161,9 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
-import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
@@ -189,9 +190,12 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.Platform;
-import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -211,22 +215,22 @@ public class TabletServer extends Abstra
private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
private static long lastMemorySize = 0;
private static long gcTimeIncreasedCount;
- private static final Class<? extends LoggerStrategy> DEFAULT_LOGGER_STRATEGY = RoundRobinLoggerStrategy.class;
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private TabletServerLogger logger;
- private LoggerStrategy loggerStrategy;
protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
private ServerConfiguration serverConfig;
+ private LogSorter logSorter = null;
public TabletServer(ServerConfiguration conf, FileSystem fs) {
super();
this.serverConfig = conf;
this.instance = conf.getInstance();
this.fs = TraceFileSystem.wrap(fs);
+ this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
SimpleTimer.getInstance().schedule(new TimerTask() {
@Override
public void run() {
@@ -897,7 +901,7 @@ public class TabletServer extends Abstra
ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
runState.set(ScanRunState.RUNNING);
Thread.currentThread().setName(
@@ -962,7 +966,7 @@ public class TabletServer extends Abstra
Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
if (isCancelled() || session == null)
return;
-
+
TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
long bytesAdded = 0;
@@ -1850,7 +1854,7 @@ public class TabletServer extends Abstra
final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
// Root tablet assignment must take place immediately
if (extent.isRootTablet()) {
- new Thread("Root Tablet Assignment") {
+ new Daemon("Root Tablet Assignment") {
public void run() {
ah.run();
if (onlineTablets.containsKey(extent)) {
@@ -1973,12 +1977,6 @@ public class TabletServer extends Abstra
return statsKeeper.getTabletStats();
}
- @Override
- public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {
- loggerStrategy.preferLoggers(loggers);
- }
-
- @Override
public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
try {
@@ -2042,6 +2040,60 @@ public class TabletServer extends Abstra
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+ String myname = getClientAddressString();
+ myname = myname.replace(':', '+');
+ Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+ Set<String> loggers = new HashSet<String>();
+ logger.getLoggers(loggers);
+ nextFile:
+ for (String filename : filenames) {
+ for (String logger : loggers) {
+ if (logger.contains(filename))
+ continue nextFile;
+ }
+ List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ onlineTabletsCopy.addAll(onlineTablets.values());
+ }
+ for (Tablet tablet : onlineTabletsCopy) {
+ for (String current : tablet.getCurrentLogs()) {
+ if (current.contains(filename)) {
+ log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+ continue nextFile;
+ }
+ }
+ }
+ try {
+ String source = logDir + "/" + filename;
+ if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+ String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+ fs.mkdirs(new Path(walogArchive));
+ String dest = walogArchive + "/" + filename;
+ log.info("Archiving walog " + source + " to " + dest);
+ if (!fs.rename(new Path(source), new Path(dest)))
+ log.error("rename is unsuccessful");
+ } else {
+ log.info("Deleting walog " + filename);
+ if (!fs.delete(new Path(source), true))
+ log.warn("Failed to delete walog " + source);
+ if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+ log.info("Deleted any recovery log " + filename);
+
+ }
+ } catch (IOException e) {
+ log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+ }
+ }
+ }
+
}
private class SplitRunner implements Runnable {
@@ -2476,7 +2528,7 @@ public class TabletServer extends Abstra
AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1);
if (extent.isMeta()) {
if (extent.isRootTablet()) {
- new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+ new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
} else {
resourceManager.addMetaDataAssignment(handler);
}
@@ -2494,7 +2546,6 @@ public class TabletServer extends Abstra
private FileSystem fs;
private Instance instance;
- private ZooCache cache;
private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@@ -2527,48 +2578,23 @@ public class TabletServer extends Abstra
return statsKeeper;
}
- public Set<String> getLoggers() throws TException, MasterNotRunningException, ThriftSecurityException {
- Set<String> allLoggers = new HashSet<String>();
- String dir = ZooUtil.getRoot(instance) + Constants.ZLOGGERS;
- for (String child : cache.getChildren(dir)) {
- allLoggers.add(new String(cache.get(dir + "/" + child)));
- }
- if (allLoggers.isEmpty()) {
- log.warn("there are no loggers registered in zookeeper");
- return allLoggers;
- }
- Set<String> result = loggerStrategy.getLoggers(Collections.unmodifiableSet(allLoggers));
- Set<String> bogus = new HashSet<String>(result);
- bogus.removeAll(allLoggers);
- if (!bogus.isEmpty())
- log.warn("logger strategy is returning loggers that are not candidates");
- result.removeAll(bogus);
- if (result.isEmpty())
- log.warn("strategy returned no useful loggers");
- return result;
- }
-
- public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
+ public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
- List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
long now = RelativeTime.currentTimeMillis();
List<String> logSet = new ArrayList<String>();
- for (RemoteLogger log : logs)
+ for (DfsLogger log : logs)
logSet.add(log.toString());
- for (RemoteLogger log : logs) {
- MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
- entry.extent = extent;
- entry.tabletId = id;
- entry.timestamp = now;
- entry.server = log.getLogger();
- entry.filename = log.getFileName();
- entry.logSet = logSet;
- entries.add(entry);
- }
- MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
+ MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
+ entry.extent = extent;
+ entry.tabletId = id;
+ entry.timestamp = now;
+ entry.server = logs.get(0).getLogger();
+ entry.filename = logs.get(0).getFileName();
+ entry.logSet = logSet;
+ MetadataTable.addLogEntry(SecurityConstants.getSystemCredentials(), entry, getLock());
}
-
+
private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK);
@@ -2681,6 +2707,12 @@ public class TabletServer extends Abstra
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
+ try {
+ logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+ } catch (Exception ex) {
+ log.error("Error setting watches for recoveries");
+ throw new RuntimeException(ex);
+ }
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
@@ -2991,31 +3023,6 @@ public class TabletServer extends Abstra
majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor()));
majorCompactorThread.setName("Split/MajC initiator");
majorCompactorThread.start();
-
- String className = getSystemConfiguration().get(Property.TSERV_LOGGER_STRATEGY);
- Class<? extends LoggerStrategy> klass = DEFAULT_LOGGER_STRATEGY;
- try {
- klass = AccumuloClassLoader.loadClass(className, LoggerStrategy.class);
- } catch (Exception ex) {
- log.warn("Unable to load class " + className + " for logger strategy, using " + klass.getName(), ex);
- }
- try {
- Constructor<? extends LoggerStrategy> constructor = klass.getConstructor(TabletServer.class);
- loggerStrategy = constructor.newInstance(this);
- loggerStrategy.init(serverConfig);
- } catch (Exception ex) {
- log.warn("Unable to create object of type " + klass.getName() + " using " + DEFAULT_LOGGER_STRATEGY.getName());
- }
- if (loggerStrategy == null) {
- try {
- loggerStrategy = DEFAULT_LOGGER_STRATEGY.getConstructor(TabletServer.class).newInstance(this);
- } catch (Exception ex) {
- log.fatal("Programmer error: cannot create a logger strategy.");
- throw new RuntimeException(ex);
- }
- }
- cache = new ZooCache();
-
}
public TabletServerStatus getStats(Map<String,MapCounter<ScanRunState>> scanCounts) {
@@ -3096,12 +3103,11 @@ public class TabletServer extends Abstra
result.name = getClientAddressString();
result.holdTime = resourceManager.holdTime();
result.lookups = seekCount.get();
- result.loggers = new HashSet<String>();
result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount();
result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount();
result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
- logger.getLoggers(result.loggers);
+ result.logSorts = logSorter.getLogSorts();
return result;
}
@@ -3113,6 +3119,7 @@ public class TabletServer extends Abstra
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration conf = new ServerConfiguration(instance);
Accumulo.init(fs, conf, "tserver");
+ // recoverLocalWriteAheadLogs(fs, conf);
TabletServer server = new TabletServer(conf, fs);
server.config(hostname);
Accumulo.enableTracing(hostname, "tserver");
@@ -3121,7 +3128,47 @@ public class TabletServer extends Abstra
log.error("Uncaught exception in TabletServer.main, exiting", ex);
}
}
-
+
+ /**
+ * Copy local walogs into HDFS on an upgrade
+ *
+ */
+ public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
+ FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
+ AccumuloConfiguration conf = serverConf.getConfiguration();
+ String localWalDirectories = conf.get(Property.LOGGER_DIR);
+ for (String localWalDirectory : localWalDirectories.split(",")) {
+ for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
+ String name = file.getPath().getName();
+ try {
+ UUID.fromString(name);
+ } catch (IllegalArgumentException ex) {
+ log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
+ continue;
+ }
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+ log.info("Openning local log " + file.getPath());
+ Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
+ Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
+ FSDataOutputStream writer = fs.create(tmp);
+ while (reader.next(key, value)) {
+ try {
+ key.write(writer);
+ value.write(writer);
+ } catch (EOFException ex) {
+ break;
+ }
+ }
+ writer.close();
+ reader.close();
+ fs.rename(tmp, new Path(tmp.getParent(), name));
+ log.info("Copied local log " + name);
+ localfs.delete(new Path(localWalDirectory, name), true);
+ }
+ }
+ }
+
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions++;
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3144,7 +3191,7 @@ public class TabletServer extends Abstra
String recovery = null;
for (String log : entry.logSet) {
String[] parts = log.split("/"); // "host:port/filename"
- log = ServerConstants.getRecoveryDir() + "/" + parts[1] + ".recovered";
+ log = ServerConstants.getRecoveryDir() + "/" + parts[1];
Path finished = new Path(log + "/finished");
TabletServer.log.info("Looking for " + finished);
if (fs.exists(finished)) {
@@ -3337,11 +3384,27 @@ public class TabletServer extends Abstra
return METRICS_PREFIX;
}
- // public AccumuloConfiguration getTableConfiguration(String tableId) {
- // return ServerConfiguration.getTableConfiguration(instance, tableId);
- // }
-
public TableConfiguration getTableConfiguration(KeyExtent extent) {
return ServerConfiguration.getTableConfiguration(instance, extent.getTableId().toString());
}
+ public DfsLogger.ServerResources getServerConfig() {
+ return new DfsLogger.ServerResources() {
+
+ @Override
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ @Override
+ public Set<TServerInstance> getCurrentTServers() {
+ return null;
+ }
+
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ return getSystemConfiguration();
+ }
+ };
+ }
+
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Tue Jun 5 13:18:22 2012
@@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.util.NamingThreadFactory;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
import org.apache.hadoop.fs.FileSystem;
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Tue Jun 5 13:18:22 2012
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.server.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a connection to a logger.
+ *
+ */
+public class DfsLogger {
+ private static Logger log = Logger.getLogger(DfsLogger.class);
+
+ public static class LogClosedException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public LogClosedException() {
+ super("LogClosed");
+ }
+ }
+
+ public interface ServerResources {
+ AccumuloConfiguration getConfiguration();
+
+ FileSystem getFileSystem();
+
+ Set<TServerInstance> getCurrentTServers();
+ }
+
+ private LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
+
+ private String closeLock = new String("foo");
+
+ private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
+
+ private static final LogFileValue EMPTY = new LogFileValue();
+
+ private boolean closed = false;
+
+ private class LogSyncingTask implements Runnable {
+
+ @Override
+ public void run() {
+ ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
+ while (true) {
+ work.clear();
+
+ try {
+ work.add(workQueue.take());
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ workQueue.drainTo(work);
+
+ synchronized (closeLock) {
+ if (!closed) {
+ try {
+ logFile.sync();
+ } catch (IOException ex) {
+ log.warn("Exception syncing " + ex);
+ for (DfsLogger.LogWork logWork : work) {
+ logWork.exception = ex;
+ }
+ }
+ } else {
+ for (DfsLogger.LogWork logWork : work) {
+ logWork.exception = new LogClosedException();
+ }
+ }
+ }
+
+ boolean sawClosedMarker = false;
+ for (DfsLogger.LogWork logWork : work)
+ if (logWork == CLOSED_MARKER)
+ sawClosedMarker = true;
+ else
+ logWork.latch.countDown();
+
+ if (sawClosedMarker) {
+ synchronized (closeLock) {
+ closeLock.notifyAll();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ static class LogWork {
+ List<TabletMutations> mutations;
+ CountDownLatch latch;
+ volatile Exception exception;
+
+ public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
+ this.mutations = mutations;
+ this.latch = latch;
+ }
+ }
+
+ public static class LoggerOperation {
+ private LogWork work;
+
+ public LoggerOperation(LogWork work) {
+ this.work = work;
+ }
+
+ public void await() throws IOException {
+ try {
+ work.latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (work.exception != null) {
+ if (work.exception instanceof IOException)
+ throw (IOException) work.exception;
+ else if (work.exception instanceof RuntimeException)
+ throw (RuntimeException) work.exception;
+ else
+ throw new RuntimeException(work.exception);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ // filename is unique
+ if (obj == null)
+ return false;
+ if (obj instanceof DfsLogger)
+ return getFileName().equals(((DfsLogger) obj).getFileName());
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ // filename is unique
+ return getFileName().hashCode();
+ }
+
+ private ServerResources conf;
+ private FSDataOutputStream logFile;
+ private Path logPath;
+ private String logger;
+
+ public DfsLogger(ServerResources conf) throws IOException {
+ this.conf = conf;
+ }
+
+ public DfsLogger(ServerResources conf, String logger, String filename) throws IOException {
+ this.conf = conf;
+ this.logger = logger;
+ this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+ }
+
+ public synchronized void open(String address) throws IOException {
+ String filename = UUID.randomUUID().toString();
+ logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
+
+ logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename);
+ try {
+ FileSystem fs = conf.getFileSystem();
+ short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
+ if (replication == 0)
+ replication = (short) fs.getDefaultReplication();
+ long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
+ if (blockSize == 0)
+ blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+ int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
+ blockSize -= blockSize % checkSum;
+ blockSize = Math.max(blockSize, checkSum);
+ logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+ LogFileKey key = new LogFileKey();
+ key.event = OPEN;
+ key.tserverSession = filename;
+ key.filename = filename;
+ write(key, EMPTY);
+ log.debug("Got new write-ahead log: " + this);
+ } catch (IOException ex) {
+ if (logFile != null)
+ logFile.close();
+ logFile = null;
+ throw ex;
+ }
+
+ Thread t = new Daemon(new LogSyncingTask());
+ t.setName("Accumulo WALog thread " + toString());
+ t.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
+ */
+ @Override
+ public String toString() {
+ return getLogger() + "/" + getFileName();
+ }
+
+ public String getLogger() {
+ return logger;
+ }
+
+ public String getFileName() {
+ return logPath.getName();
+ }
+
+ public void close() throws IOException {
+
+ synchronized (closeLock) {
+ if (closed)
+ return;
+ // after closed is set to true, nothing else should be added to the queue
+ // CLOSED_MARKER should be the last thing on the queue, therefore when the
+ // background thread sees the marker and exits there should be nothing else
+ // to process... so nothing should be left waiting for the background
+ // thread to do work
+ closed = true;
+ workQueue.add(CLOSED_MARKER);
+ while (!workQueue.isEmpty())
+ try {
+ closeLock.wait();
+ } catch (InterruptedException e) {
+ log.info("Interrupted");
+ }
+ }
+
+ if (logFile != null)
+ try {
+ logFile.close();
+ } catch (IOException ex) {
+ log.error(ex);
+ throw new LogClosedException();
+ }
+ }
+
+ public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
+ // write this log to the METADATA table
+ final LogFileKey key = new LogFileKey();
+ key.event = DEFINE_TABLET;
+ key.seq = seq;
+ key.tid = tid;
+ key.tablet = tablet;
+ try {
+ write(key, EMPTY);
+ logFile.sync();
+ } catch (IOException ex) {
+ log.error(ex);
+ throw ex;
+ }
+ }
+
+ /**
+ * @param key
+ * @param empty2
+ * @throws IOException
+ */
+ private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
+ key.write(logFile);
+ value.write(logFile);
+ }
+
+ public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
+ return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
+ }
+
+ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+ DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
+
+ synchronized (DfsLogger.this) {
+ try {
+ for (TabletMutations mutation : mutations) {
+ LogFileKey key = new LogFileKey();
+ key.event = MANY_MUTATIONS;
+ key.seq = mutation.seq;
+ key.tid = mutation.tabletID;
+ LogFileValue value = new LogFileValue();
+ Mutation[] m = new Mutation[mutation.mutations.size()];
+ for (int i = 0; i < m.length; i++)
+ m[i] = new Mutation(mutation.mutations.get(i));
+ value.mutations = m;
+ write(key, value);
+ }
+ } catch (Exception e) {
+ log.error(e, e);
+ work.exception = e;
+ }
+ }
+
+ synchronized (closeLock) {
+ // use a different lock for close check so that adding to work queue does not need
+ // to wait on walog I/O operations
+
+ if (closed)
+ throw new LogClosedException();
+ workQueue.add(work);
+ }
+
+ return new LoggerOperation(work);
+ }
+
+ public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
+ LogFileKey key = new LogFileKey();
+ key.event = COMPACTION_FINISH;
+ key.seq = seq;
+ key.tid = tid;
+ try {
+ write(key, EMPTY);
+ } catch (IOException ex) {
+ log.error(ex);
+ throw ex;
+ }
+ }
+
+ public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
+ LogFileKey key = new LogFileKey();
+ key.event = COMPACTION_START;
+ key.seq = seq;
+ key.tid = tid;
+ key.filename = fqfn;
+ try {
+ write(key, EMPTY);
+ } catch (IOException ex) {
+ log.error(ex);
+ throw ex;
+ }
+ }
+
+}
Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Tue Jun 5 13:18:22 2012
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.RecoveryStatus;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+public class LogSorter {
+
+ private static final Logger log = Logger.getLogger(LogSorter.class);
+ FileSystem fs;
+ AccumuloConfiguration conf;
+
+ private Map<String,Work> currentWork = new HashMap<String,Work>();
+
+ class Work implements Runnable {
+ final String name;
+ FSDataInputStream input;
+ final String destPath;
+ long bytesCopied = -1;
+ long sortStart = 0;
+ long sortStop = -1;
+ private final LogSortNotifier cback;
+
+ synchronized long getBytesCopied() throws IOException {
+ return input == null ? bytesCopied : input.getPos();
+ }
+
+ Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
+ this.name = name;
+ this.input = input;
+ this.destPath = destPath;
+ this.cback = cback;
+ }
+ synchronized boolean finished() {
+ return input == null;
+ }
+ public void run() {
+ sortStart = System.currentTimeMillis();
+ String formerThreadName = Thread.currentThread().getName();
+ int part = 0;
+ try {
+ final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+ Thread.currentThread().setName("Sorting " + name + " for recovery");
+ while (true) {
+ final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+ try {
+ long start = input.getPos();
+ while (input.getPos() - start < bufferSize) {
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+ key.readFields(input);
+ value.readFields(input);
+ buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+ }
+ writeBuffer(buffer, part++);
+ buffer.clear();
+ } catch (EOFException ex) {
+ writeBuffer(buffer, part++);
+ break;
+ }
+ }
+ fs.create(new Path(destPath, "finished")).close();
+ log.info("Log copy/sort of " + name + " complete");
+ } catch (Throwable t) {
+ try {
+ fs.create(new Path(destPath, "failed")).close();
+ } catch (IOException e) {
+ log.error("Error creating failed flag file " + name, e);
+ }
+ log.error(t, t);
+ try {
+ cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
+ } catch (Exception ex) {
+ log.error("Strange error notifying the master of a logSort problem for file " + name);
+ }
+ } finally {
+ Thread.currentThread().setName(formerThreadName);
+ try {
+ close();
+ } catch (IOException e) {
+ log.error("Error during cleanup sort/copy " + name, e);
+ }
+ sortStop = System.currentTimeMillis();
+ synchronized (currentWork) {
+ currentWork.remove(name);
+ }
+ try {
+ cback.notice(name, getBytesCopied(), part, getSortTime(), "");
+ } catch (Exception ex) {
+ log.error("Strange error reporting successful log sort " + name, ex);
+ }
+ }
+ }
+
+ private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ String path = destPath + String.format("/part-r-%05d", part++);
+ MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+ try {
+ Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
+ @Override
+ public int compare(Pair<LogFileKey,LogFileValue> o1, Pair<LogFileKey,LogFileValue> o2) {
+ return o1.getFirst().compareTo(o2.getFirst());
+ }
+ });
+ for (Pair<LogFileKey,LogFileValue> entry : buffer) {
+ output.append(entry.getFirst(), entry.getSecond());
+ }
+ } finally {
+ output.close();
+ }
+ }
+
+ synchronized void close() throws IOException {
+ bytesCopied = input.getPos();
+ input.close();
+ input = null;
+ }
+
+ public synchronized long getSortTime() {
+ if (sortStart > 0) {
+ if (sortStop > 0)
+ return sortStop - sortStart;
+ return System.currentTimeMillis() - sortStart;
+ }
+ return 0;
+ }
+ };
+
+ final ThreadPoolExecutor threadPool;
+ private Instance instance;
+
+ public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
+ this.instance = instance;
+ this.fs = fs;
+ this.conf = conf;
+ int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
+ this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+ }
+
+ public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
+ final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
+ final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ zoo.mkdirs(path);
+ List<String> children = zoo.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ switch (event.getType()) {
+ case NodeChildrenChanged:
+ if (event.getPath().equals(path))
+ try {
+ attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
+ } catch (KeeperException e) {
+ log.error("Unable to get recovery information", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted getting recovery information", e);
+ }
+ else
+ log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
+ break;
+ case NodeCreated:
+ case NodeDataChanged:
+ case NodeDeleted:
+ case None:
+ log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+ break;
+
+ }
+ }
+ });
+ attemptRecoveries(zoo, serverName, path, children);
+ Random r = new Random();
+ // Add a little jitter to avoid all the tservers slamming zookeeper at once
+ SimpleTimer.getInstance().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
+ } catch (KeeperException e) {
+ log.error("Unable to get recovery information", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted getting recovery information", e);
+ }
+ }
+ }, r.nextInt(1000), 60 * 1000);
+ }
+
+ private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, String path, List<String> children) {
+ if (children.size() == 0)
+ return;
+ log.info("Zookeeper references " + children.size() + " recoveries, attempting locks");
+ Random random = new Random();
+ Collections.shuffle(children, random);
+ try {
+ for (String child : children) {
+ final String childPath = path + "/" + child;
+ log.debug("Attempting to lock " + child);
+ ZooLock lock = new ZooLock(childPath);
+ if (lock.tryLock(new LockWatcher() {
+ @Override
+ public void lostLock(LockLossReason reason) {
+ log.info("Ignoring lost lock event, reason " + reason);
+ }
+ }, serverName.getBytes())) {
+ // Great... we got the lock, but maybe we're too busy
+ if (threadPool.getQueue().size() > 1) {
+ lock.unlock();
+ log.debug("got the lock, but thread pool is busy; released the lock on " + child);
+ continue;
+ }
+ log.info("got lock for " + child);
+ byte[] contents = zoo.getData(childPath, null);
+ String destination = Constants.getRecoveryDir(conf) + "/" + child;
+ startSort(new String(contents), destination, new LogSortNotifier() {
+ @Override
+ public void notice(String name, long bytes, int parts, long milliseconds, String error) {
+ log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
+ try {
+ zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
+ } catch (Exception e) {
+ log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
+ }
+ }
+ });
+ } else {
+ log.info("failed to get the lock " + child);
+ }
+ }
+ } catch (Throwable t) {
+ log.error("Unexpected error", t);
+ }
+ }
+
+ public interface LogSortNotifier {
+ public void notice(String name, long bytes, int parts, long milliseconds, String error);
+ }
+
+ private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
+ log.info("Copying " + src + " to " + dest);
+ fs.delete(new Path(dest), true);
+ Path srcPath = new Path(src);
+ synchronized (currentWork) {
+ Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
+ if (!currentWork.containsKey(srcPath.getName())) {
+ threadPool.execute(work);
+ currentWork.put(srcPath.getName(), work);
+ }
+ }
+ }
+
+ public List<RecoveryStatus> getLogSorts() {
+ List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
+ synchronized (currentWork) {
+ for (Entry<String,Work> entries : currentWork.entrySet()) {
+ RecoveryStatus status = new RecoveryStatus();
+ status.name = entries.getKey();
+ try {
+ status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
+ } catch (IOException ex) {
+ log.warn("Error getting bytes read");
+ }
+ status.runtime = (int) entries.getValue().getSortTime();
+ result.add(status);
+ }
+ return result;
+ }
+ }
+}
Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Tue Jun 5 13:18:22 2012
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -35,13 +34,12 @@ import org.apache.accumulo.core.conf.Pro
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.tabletserver.Tablet;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
import org.apache.accumulo.server.tabletserver.TabletServer;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
import org.apache.log4j.Logger;
/**
@@ -61,7 +59,7 @@ public class TabletServerLogger {
private final TabletServer tserver;
// The current log set: always updated to a new set with every change of loggers
- private final List<RemoteLogger> loggers = new ArrayList<RemoteLogger>();
+ private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
// The current generation of logSet.
// Because multiple threads can be using a log set at one time, a log
@@ -134,7 +132,7 @@ public class TabletServerLogger {
this.maxSize = maxSize;
}
- private int initializeLoggers(final List<RemoteLogger> copy) throws IOException {
+ private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
final int[] result = {-1};
testLockAndRun(logSetLock, new TestCallWithWriteLock() {
boolean test() {
@@ -165,8 +163,8 @@ public class TabletServerLogger {
public void getLoggers(Set<String> loggersOut) {
logSetLock.readLock().lock();
try {
- for (RemoteLogger logger : loggers) {
- loggersOut.add(logger.getLogger());
+ for (DfsLogger logger : loggers) {
+ loggersOut.add(logger.toString());
}
} finally {
logSetLock.readLock().unlock();
@@ -183,35 +181,13 @@ public class TabletServerLogger {
}
try {
- while (true) {
- Set<String> loggerAddresses = tserver.getLoggers();
- if (!loggerAddresses.isEmpty()) {
- for (String logger : loggerAddresses) {
- try {
- loggers.add(new RemoteLogger(logger, UUID.randomUUID(), tserver.getSystemConfiguration()));
- } catch (LoggerClosedException t) {
- close();
- break;
- } catch (Exception t) {
- close();
- log.warn("Unable to connect to " + logger + ": " + t);
- break;
- }
- }
-
- if (loggers.size() == loggerAddresses.size())
- break;
- if (loggers.size() > 0) {
- // something is screwy, loggers.size() should be 0 or loggerAddresses.size()..
- throw new RuntimeException("Unexpected number of loggers " + loggers.size() + " " + loggerAddresses.size());
- }
- }
- UtilWaitThread.sleep(1000);
- }
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+ alog.open(tserver.getClientAddressString());
+ loggers.add(alog);
logSetId.incrementAndGet();
return;
} catch (Exception t) {
- throw new IOException(t);
+ throw new RuntimeException(t);
}
}
@@ -229,13 +205,13 @@ public class TabletServerLogger {
throw new IllegalStateException("close should be called with write lock held!");
}
try {
- for (RemoteLogger logger : loggers) {
+ for (DfsLogger logger : loggers) {
try {
logger.close();
- } catch (LoggerClosedException ex) {
- // expected
+ } catch (DfsLogger.LogClosedException ex) {
+ // ignore
} catch (Throwable ex) {
- log.error("Unable to cleanly close logger " + logger.getLogger() + ": " + ex);
+ log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
}
}
loggers.clear();
@@ -246,7 +222,7 @@ public class TabletServerLogger {
}
interface Writer {
- LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
+ LoggerOperation write(DfsLogger logger, int seq) throws Exception;
}
private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -265,7 +241,7 @@ public class TabletServerLogger {
while (!success) {
try {
// get a reference to the loggers that no other thread can touch
- ArrayList<RemoteLogger> copy = new ArrayList<RemoteLogger>();
+ ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
currentLogSet = initializeLoggers(copy);
// add the logger to the log set for the memory in the tablet,
@@ -294,7 +270,7 @@ public class TabletServerLogger {
if (seq < 0)
throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
- for (RemoteLogger wal : copy) {
+ for (DfsLogger wal : copy) {
LoggerOperation lop = writer.write(wal, seq);
if (lop != null)
queuedOperations.add(lop);
@@ -307,13 +283,11 @@ public class TabletServerLogger {
// double-check: did the log set change?
success = (currentLogSet == logSetId.get());
}
+ } catch (DfsLogger.LogClosedException ex) {
+ log.debug("Logs closed while writing, retrying " + (attempt + 1));
} catch (Exception t) {
- if (attempt == 0) {
- log.info("Log write failed: another thread probably closed the log");
- } else {
- log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
- UtilWaitThread.sleep(100);
- }
+ log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+ UtilWaitThread.sleep(100);
} finally {
attempt++;
}
@@ -356,7 +330,7 @@ public class TabletServerLogger {
return -1;
return write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
return null;
}
@@ -368,7 +342,7 @@ public class TabletServerLogger {
return -1;
int seq = write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
return logger.log(tabletSeq, commitSession.getLogId(), m);
}
});
@@ -388,7 +362,7 @@ public class TabletServerLogger {
int seq = write(loggables.keySet(), false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
@@ -419,7 +393,7 @@ public class TabletServerLogger {
int seq = write(commitSession, true, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
return null;
}
@@ -435,7 +409,7 @@ public class TabletServerLogger {
return -1;
write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
return null;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Tue Jun 5 13:18:22 2012
@@ -90,9 +90,6 @@ public class GetMasterStats {
out(2, "Time Difference %.1f", ((now - server.lastContact) / 1000.));
out(2, "Total Records %d", summary.recs);
out(2, "Lookups %d", server.lookups);
- out(2, "Loggers %d", server.loggers.size());
- for (String logger : server.loggers)
- out(3, "Logger %s", logger);
if (server.holdTime > 0)
out(2, "Hold Time %d", server.holdTime);
if (server.tableMap != null && server.tableMap.size() > 0) {
@@ -111,16 +108,12 @@ public class GetMasterStats {
out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : info.minor.queued);
}
}
- }
- }
- if (stats.recovery != null && stats.recovery.size() > 0) {
- out(0, "Recovery");
- for (RecoveryStatus r : stats.recovery) {
- out(1, "Log Server %s", r.host);
- out(1, "Log Name %s", r.name);
- out(1, "Map Progress: %.2f%%", r.mapProgress * 100);
- out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100);
- out(1, "Time running: %s", r.runtime / 1000.);
+ out(2, "Recoveries %d", server.logSorts.size());
+ for (RecoveryStatus sort : server.logSorts) {
+ out(3, "File %s", sort.name);
+ out(3, "Progress %.2f%%", sort.progress * 100);
+ out(3, "Time running %s", sort.runtime / 1000.);
+ }
}
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java Tue Jun 5 13:18:22 2012
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
-import org.apache.accumulo.server.logger.IdentityReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -135,7 +134,7 @@ public class RunTests extends Configured
job.setOutputValueClass(Text.class);
// don't do anything with the results (yet) a summary would be nice
- job.setReducerClass(IdentityReducer.class);
+ job.setNumReduceTasks(0);
// submit the job
log.info("Starting tests");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Tue Jun 5 13:18:22 2012
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.accumulo.cloudtrace.thrift.TInfo;
import org.apache.accumulo.core.client.Instance;
@@ -164,9 +163,6 @@ public class NullTserver {
public void unloadTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
@Override
- public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {}
-
- @Override
public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveScan>();
}
@@ -188,6 +184,16 @@ public class NullTserver {
public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
}
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+ }
}
public static void main(String[] args) throws Exception {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Tue Jun 5 13:18:22 2012
@@ -19,11 +19,7 @@ package org.apache.accumulo.server.test.
import java.net.InetAddress;
import java.util.Properties;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
@@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a
import org.apache.accumulo.core.iterators.LongCombiner;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.server.test.randomwalk.State;
import org.apache.accumulo.server.test.randomwalk.Test;
import org.apache.hadoop.fs.FileSystem;
public class Setup extends Test {
- private static final int CORE_POOL_SIZE = 8;
- private static final int MAX_POOL_SIZE = CORE_POOL_SIZE;
+ private static final int MAX_POOL_SIZE = 8;
static String tableName = null;
@Override
@@ -67,14 +61,7 @@ public class Setup extends Test {
state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
BulkPlusOne.counter.set(0l);
- BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
- ThreadFactory factory = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Daemon(new LoggingRunnable(log, r));
- }
- };
- ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory);
+ ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
state.set("pool", e);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java Tue Jun 5 13:18:22 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab
// If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually
public class TraceFileSystem extends FileSystem {
+
@Override
public void setConf(Configuration conf) {
Span span = Trace.start("setConf");
@@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil
this.impl = impl;
}
+ public FileSystem getImplementation() {
+ return impl;
+ }
+
@Override
public URI getUri() {
Span span = Trace.start("getUri");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Tue Jun 5 13:18:22 2012
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.conf.S
public class AddressUtil {
static public InetSocketAddress parseAddress(String address, Property portDefaultProperty) {
- final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
+ final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(portDefaultProperty);
return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Tue Jun 5 13:18:22 2012
@@ -353,7 +353,6 @@ public class Initialize {
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
- zoo.putPersistentData(zkInstanceRoot + Constants.ZLOGGERS, new byte[0], NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Tue Jun 5 13:18:22 2012
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
-import java.util.TreeSet;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
@@ -67,7 +66,6 @@ import org.apache.accumulo.core.util.Cac
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.StringUtil;
-import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -769,11 +767,7 @@ public class MetadataTable extends org.a
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZROOT_TABLET_WALOGS;
}
- public static void addLogEntries(AuthInfo credentials, List<LogEntry> entries, ZooLock zooLock) {
- if (entries.size() == 0)
- return;
- // entries should be a complete log set, so we should only need to write the first entry
- LogEntry entry = entries.get(0);
+ public static void addLogEntry(AuthInfo credentials, LogEntry entry, ZooLock zooLock) {
if (entry.extent.isRootTablet()) {
String root = getZookeeperLogLocation();
while (true) {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Tue Jun 5 13:18:22 2012
@@ -25,18 +25,14 @@ import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.TBufferedSocket;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -205,7 +201,7 @@ public class TServerUtils {
}
}
- public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, int numThreads,
+ public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
long timeBetweenThreadChecks) throws TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
THsHaServer.Args options = new THsHaServer.Args(transport);
@@ -214,21 +210,8 @@ public class TServerUtils {
/*
* Create our own very special thread pool.
*/
- // 1. name the threads for client connections
- ThreadFactory factory = new ThreadFactory() {
- AtomicInteger threadId = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(new LoggingRunnable(log, r), "ClientPool-" + threadId.getAndIncrement());
- }
- };
- // 2. allow tasks to queue, potentially forever
- final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- // 3. keep the number of threads small
- final int minimumThreadPoolSize = numThreads;
- final ThreadPoolExecutor pool = new ThreadPoolExecutor(minimumThreadPoolSize, minimumThreadPoolSize, 10L, TimeUnit.SECONDS, queue, factory);
- // 4. periodically adjust the number of threads we need by checking how busy our threads are
+ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
SimpleTimer.getInstance().schedule(new TimerTask() {
@Override
public void run() {
@@ -239,7 +222,7 @@ public class TServerUtils {
pool.setCorePoolSize(larger);
} else {
if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
- int smaller = Math.max(minimumThreadPoolSize, pool.getCorePoolSize() - 1);
+ int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
if (smaller != pool.getCorePoolSize()) {
// there is a race condition here... the active count could be higher by the time
// we decrease the core pool size... so the active count could end up higher than
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java Tue Jun 5 13:18:22 2012
@@ -41,7 +41,6 @@ public class ZooZap {
boolean zapMaster = false;
boolean zapTservers = false;
- boolean zapLoggers = false;
boolean zapTracers = false;
if (args.length == 0) {
@@ -54,8 +53,6 @@ public class ZooZap {
zapTservers = true;
} else if (args[i].equals("-master")) {
zapMaster = true;
- } else if (args[i].equals("-loggers")) {
- zapLoggers = true;
} else if (args[i].equals("-tracers")) {
zapTracers = true;
} else if (args[i].equals("-verbose")) {
@@ -98,24 +95,19 @@ public class ZooZap {
}
}
- if (zapLoggers) {
- String loggersPath = Constants.ZROOT + "/" + iid + Constants.ZLOGGERS;
- zapDirectory(zoo, loggersPath);
- }
-
if (zapTracers) {
- String loggersPath = Constants.ZROOT + "/" + iid + Constants.ZTRACERS;
- zapDirectory(zoo, loggersPath);
+ String path = Constants.ZROOT + "/" + iid + Constants.ZTRACERS;
+ zapDirectory(zoo, path);
}
}
- private static void zapDirectory(IZooReaderWriter zoo, String loggersPath) {
+ private static void zapDirectory(IZooReaderWriter zoo, String path) {
try {
- List<String> children = zoo.getChildren(loggersPath);
+ List<String> children = zoo.getChildren(path);
for (String child : children) {
- message("Deleting " + loggersPath + "/" + child + " from zookeeper");
- zoo.recursiveDelete(loggersPath + "/" + child, NodeMissingPolicy.SKIP);
+ message("Deleting " + path + "/" + child + " from zookeeper");
+ zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
}
} catch (Exception e) {
e.printStackTrace();
Modified: accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java (original)
+++ accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java Tue Jun 5 13:18:22 2012
@@ -52,8 +52,6 @@ public class Main {
runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.gc.SimpleGarbageCollector");
} else if (args[0].equals("monitor")) {
runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.monitor.Monitor");
- } else if (args[0].equals("logger")) {
- runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.logger.LogService");
} else if (args[0].equals("tracer")) {
runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.trace.TraceServer");
} else if (args[0].equals("classpath")) {
Modified: accumulo/trunk/test/system/continuous/agitator.pl
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/continuous/agitator.pl?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/test/system/continuous/agitator.pl (original)
+++ accumulo/trunk/test/system/continuous/agitator.pl Tue Jun 5 13:18:22 2012
@@ -74,27 +74,8 @@ while(1){
$t = strftime "%Y%m%d %H:%M:%S", localtime;
$rn = rand(1);
- $kill_tserver = 0;
- $kill_logger = 0;
- if($rn <.33){
- $kill_tserver = 1;
- $kill_logger = 1;
- }elsif($rn < .66){
- $kill_tserver = 1;
- $kill_logger = 0;
- }else{
- $kill_tserver = 0;
- $kill_logger = 1;
- }
-
- print STDERR "$t Killing $server $kill_tserver $kill_logger\n";
- if($kill_tserver) {
- system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
- }
-
- if($kill_logger) {
- system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" logger KILL");
- }
+ print STDERR "$t Killing $server\n";
+ system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
}
sleep($sleep2 * 60);