You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:20:28 UTC
[2/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table
updates
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index afd3454..aeb73b4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
@@ -30,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,6 +45,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
@@ -147,8 +148,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.GarbageCollectionLogger;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -1440,6 +1441,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
+
@Override
public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
@@ -1500,6 +1502,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
final AssignmentHandler ah = new AssignmentHandler(extent);
// final Runnable ah = new LoggingRunnable(log, );
// Root tablet assignment must take place immediately
+
if (extent.isRootTablet()) {
new Daemon("Root Tablet Assignment") {
@Override
@@ -1692,66 +1695,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
@Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
- String myname = getClientAddressString();
- myname = myname.replace(':', '+');
- Set<String> loggers = new HashSet<String>();
- logger.getLogFiles(loggers);
- Set<String> loggerUUIDs = new HashSet<String>();
- for (String logger : loggers)
- loggerUUIDs.add(new Path(logger).getName());
-
- nextFile: for (String filename : filenames) {
- String uuid = new Path(filename).getName();
- // skip any log we're currently using
- if (loggerUUIDs.contains(uuid))
- continue nextFile;
-
- List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
- synchronized (onlineTablets) {
- onlineTabletsCopy.addAll(onlineTablets.values());
- }
- for (Tablet tablet : onlineTabletsCopy) {
- for (String current : tablet.getCurrentLogFiles()) {
- if (current.contains(uuid)) {
- log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
- continue nextFile;
- }
- }
- }
-
- try {
- Path source = new Path(filename);
- if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
- Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
- fs.mkdirs(walogArchive);
- Path dest = new Path(walogArchive, source.getName());
- log.info("Archiving walog " + source + " to " + dest);
- if (!fs.rename(source, dest))
- log.error("rename is unsuccessful");
- } else {
- log.info("Deleting walog " + filename);
- Path sourcePath = new Path(filename);
- if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath))
- && !fs.deleteRecursively(sourcePath))
- log.warn("Failed to delete walog " + source);
- for (String recovery : ServerConstants.getRecoveryDirs()) {
- Path recoveryPath = new Path(recovery, source.getName());
- try {
- if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
- log.info("Deleted any recovery log " + filename);
- } catch (FileNotFoundException ex) {
- // ignore
- }
- }
- }
- } catch (IOException e) {
- log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
- }
- }
- }
-
- @Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
checkPermission(credentials, null, "getActiveCompactions");
@@ -1772,14 +1715,20 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
- Set<String> logs = new HashSet<String>();
- logger.getLogFiles(logs);
- return new ArrayList<String>(logs);
+ String log = logger.getLogFile();
+ return Collections.singletonList(log);
+ }
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ log.warn("Garbage collector is attempting to remove logs through the tablet server");
+ log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" +
+ "Restart your file Garbage Collector.");
}
}
private class SplitRunner implements Runnable {
- private Tablet tablet;
+ private final Tablet tablet;
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
@@ -2033,7 +1982,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
log.error("Unexpected error ", e);
}
log.debug("Unassigning " + tls);
- TabletStateStore.unassign(TabletServer.this, tls);
+ TabletStateStore.unassign(TabletServer.this, tls, null);
} catch (DistributedStoreException ex) {
log.warn("Unable to update storage", ex);
} catch (KeeperException e) {
@@ -2243,29 +2192,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
- public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
- if (!this.onlineTablets.containsKey(extent)) {
- log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline");
- // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs,
- // if it doesn't, we'll need to do the same recovery with the old files.
- return;
- }
-
- log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
- long now = RelativeTime.currentTimeMillis();
- List<String> logSet = new ArrayList<String>();
- for (DfsLogger log : logs)
- logSet.add(log.getFileName());
- LogEntry entry = new LogEntry();
- entry.extent = extent;
- entry.tabletId = id;
- entry.timestamp = now;
- entry.server = logs.get(0).getLogger();
- entry.filename = logs.get(0).getFileName();
- entry.logSet = logSet;
- MetadataTableUtil.addLogEntry(this, entry, getLock());
- }
-
private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName)
throws UnknownHostException {
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -2984,6 +2910,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions.incrementAndGet();
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
+ markUnusedWALs();
}
public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException {
@@ -3002,14 +2929,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
});
for (LogEntry entry : sorted) {
Path recovery = null;
- for (String log : entry.logSet) {
- Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
- finished = SortedLogState.getFinishedMarkerPath(finished);
- TabletServer.log.info("Looking for " + finished);
- if (fs.exists(finished)) {
- recovery = finished.getParent();
- break;
- }
+ Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename));
+ finished = SortedLogState.getFinishedMarkerPath(finished);
+ TabletServer.log.info("Looking for " + finished);
+ if (fs.exists(finished)) {
+ recovery = finished.getParent();
}
if (recovery == null)
throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry);
@@ -3046,7 +2970,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
public Collection<Tablet> getOnlineTablets() {
- return Collections.unmodifiableCollection(onlineTablets.values());
+ synchronized (onlineTablets) {
+ return new ArrayList<Tablet>(onlineTablets.values());
+ }
}
public VolumeManager getFileSystem() {
@@ -3072,4 +2998,62 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public SecurityOperation getSecurityOperation() {
return security;
}
+
+ // avoid unnecessary redundant markings to meta
+ final ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+ final Object levelLocks[] = new Object[TabletLevel.values().length];
+ {
+ for (int i = 0; i < levelLocks.length; i++) {
+ levelLocks[i] = new Object();
+ }
+ }
+
+
+ // remove any meta entries after a rolled log is no longer referenced
+ Set<DfsLogger> closedLogs = new HashSet<>();
+
+ private void markUnusedWALs() {
+ Set<DfsLogger> candidates;
+ synchronized (closedLogs) {
+ candidates = new HashSet<>(closedLogs);
+ }
+ for (Tablet tablet : getOnlineTablets()) {
+ candidates.removeAll(tablet.getCurrentLogFiles());
+ }
+ try {
+ Set<Path> filenames = new HashSet<>();
+ for (DfsLogger candidate : candidates) {
+ filenames.add(candidate.getPath());
+ }
+ MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
+ synchronized (closedLogs) {
+ closedLogs.removeAll(candidates);
+ }
+ } catch (AccumuloException ex) {
+ log.info(ex.toString(), ex);
+ }
+ }
+
+ public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) {
+ // serialize the updates to the metadata per level: avoids updating the level more than once
+ // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
+ synchronized (levelLocks[level.ordinal()]) {
+ EnumSet<TabletLevel> set = null;
+ set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+ if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
+ log.info("Writing log marker for level " + level + " " + copy.getFileName());
+ MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level);
+ }
+ set = metadataTableLogs.get(copy);
+ set.add(level);
+ }
+ }
+
+ public void walogClosed(DfsLogger currentLog) {
+ metadataTableLogs.remove(currentLog);
+ synchronized (closedLogs) {
+ closedLogs.add(currentLog);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8512690..cd7ce08 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -72,7 +72,7 @@ import com.google.common.base.Optional;
* Wrap a connection to a logger.
*
*/
-public class DfsLogger {
+public class DfsLogger implements Comparable<DfsLogger> {
public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
@@ -371,6 +371,7 @@ public class DfsLogger {
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
+ log.debug("Address is " + address);
String logger = Joiner.on("+").join(address.split(":"));
log.debug("DfsLogger.open() begin");
@@ -463,7 +464,11 @@ public class DfsLogger {
}
public String getFileName() {
- return logPath.toString();
+ return logPath;
+ }
+
+ public Path getPath() {
+ return new Path(logPath);
}
public void close() throws IOException {
@@ -609,4 +614,9 @@ public class DfsLogger {
return Joiner.on(":").join(parts[parts.length - 2].split("[+]"));
}
+ @Override
+ public int compareTo(DfsLogger o) {
+ return getFileName().compareTo(o.getFileName());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 37882cd..ab3dea2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -180,7 +180,7 @@ public class SortedLogRecovery {
// find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
// for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
while (reader.next(key, value)) {
- // LogReader.printEntry(entry);
+ // log.debug("Event " + key.event + " tablet " + key.tablet);
if (key.event != DEFINE_TABLET)
break;
if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
@@ -209,7 +209,7 @@ public class SortedLogRecovery {
if (lastStartToFinish.compactionStatus == Status.INITIAL)
lastStartToFinish.compactionStatus = Status.COMPLETE;
if (key.seq <= lastStartToFinish.lastStart)
- throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+ throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
lastStartToFinish.update(fileno, key.seq);
// Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
@@ -218,7 +218,7 @@ public class SortedLogRecovery {
lastStartToFinish.update(-1);
} else if (key.event == COMPACTION_FINISH) {
if (key.seq <= lastStartToFinish.lastStart)
- throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+ throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
if (lastStartToFinish.compactionStatus == Status.INITIAL)
lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
@@ -249,8 +249,6 @@ public class SortedLogRecovery {
break;
if (key.tid != tid)
break;
- // log.info("Replaying " + key);
- // log.info(value);
if (key.event == MUTATION) {
mr.receive(value.mutations.get(0));
} else if (key.event == MANY_MUTATIONS) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 1d385d9..bc77ffb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -21,14 +21,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,7 +39,9 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.StatusUtil;
@@ -72,20 +76,22 @@ public class TabletServerLogger {
private final TabletServer tserver;
- // The current log set: always updated to a new set with every change of loggers
- private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
+ // The current logger
+ private DfsLogger currentLog = null;
+ private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>();
+ private ThreadPoolExecutor nextLogMaker;
- // The current generation of logSet.
- // Because multiple threads can be using a log set at one time, a log
+ // The current generation of logs.
+ // Because multiple threads can be using a log at one time, a log
// failure is likely to affect multiple threads, who will all attempt to
- // create a new logSet. This will cause many unnecessary updates to the
+ // create a new log. This will cause many unnecessary updates to the
// metadata table.
// We'll use this generational counter to determine if another thread has
- // already fetched a new logSet.
- private AtomicInteger logSetId = new AtomicInteger();
+ // already fetched a new log.
+ private final AtomicInteger logId = new AtomicInteger();
// Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
- private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
private final AtomicInteger seqGen = new AtomicInteger();
@@ -146,62 +152,66 @@ public class TabletServerLogger {
this.flushCounter = flushCounter;
}
- private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
- final int[] result = {-1};
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
+ final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>();
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
- copy.clear();
- copy.addAll(loggers);
- if (!loggers.isEmpty())
- result[0] = logSetId.get();
- return loggers.isEmpty();
+ result.set(currentLog);
+ if (currentLog != null)
+ logIdOut.set(logId.get());
+ return currentLog == null;
}
@Override
void withWriteLock() throws IOException {
try {
- createLoggers();
- copy.clear();
- copy.addAll(loggers);
- if (copy.size() > 0)
- result[0] = logSetId.get();
+ createLogger();
+ result.set(currentLog);
+ if (currentLog != null)
+ logIdOut.set(logId.get());
else
- result[0] = -1;
+ logIdOut.set(-1);
} catch (IOException e) {
log.error("Unable to create loggers", e);
}
}
});
- return result[0];
+ return result.get();
}
- public void getLogFiles(Set<String> loggersOut) {
- logSetLock.readLock().lock();
+ public String getLogFile() {
+ logIdLock.readLock().lock();
try {
- for (DfsLogger logger : loggers) {
- loggersOut.add(logger.getFileName());
- }
+ return currentLog.getFileName();
} finally {
- logSetLock.readLock().unlock();
+ logIdLock.readLock().unlock();
}
}
- synchronized private void createLoggers() throws IOException {
- if (!logSetLock.isWriteLockedByCurrentThread()) {
+ synchronized private void createLogger() throws IOException {
+ if (!logIdLock.isWriteLockedByCurrentThread()) {
throw new IllegalStateException("createLoggers should be called with write lock held!");
}
- if (loggers.size() != 0) {
- throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
+ if (currentLog != null) {
+ throw new IllegalStateException("createLoggers should not be called when current log is set");
}
try {
- DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
- alog.open(tserver.getClientAddressString());
- loggers.add(alog);
- logSetId.incrementAndGet();
- return;
+ startLogMaker();
+ Object next = nextLog.take();
+ if (next instanceof Exception) {
+ throw (Exception)next;
+ }
+ if (next instanceof DfsLogger) {
+ currentLog = (DfsLogger)next;
+ logId.incrementAndGet();
+ log.info("Using next log " + currentLog.getFileName());
+ return;
+ } else {
+ throw new RuntimeException("Error: unexpected type seen: " + next);
+ }
} catch (Exception t) {
walErrors.put(System.currentTimeMillis(), "");
if (walErrors.size() >= HALT_AFTER_ERROR_COUNT) {
@@ -211,22 +221,63 @@ public class TabletServerLogger {
}
}
+ private synchronized void startLogMaker() {
+ if (nextLogMaker != null) {
+ return;
+ }
+ nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+ nextLogMaker.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (!nextLogMaker.isShutdown()) {
+ try {
+ log.debug("Creating next WAL");
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+ alog.open(tserver.getClientAddressString());
+ log.debug("Created next WAL " + alog.getFileName());
+ while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
+ log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
+ }
+ } catch (Exception t) {
+ log.error("{}", t.getMessage(), t);
+ try {
+ nextLog.offer(t, 12, TimeUnit.HOURS);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public void resetLoggers() throws IOException {
+ logIdLock.writeLock().lock();
+ try {
+ close();
+ } finally {
+ logIdLock.writeLock().unlock();
+ }
+ }
+
synchronized private void close() throws IOException {
- if (!logSetLock.isWriteLockedByCurrentThread()) {
+ if (!logIdLock.isWriteLockedByCurrentThread()) {
throw new IllegalStateException("close should be called with write lock held!");
}
try {
- for (DfsLogger logger : loggers) {
+ if (null != currentLog) {
try {
- logger.close();
+ currentLog.close();
} catch (DfsLogger.LogClosedException ex) {
// ignore
} catch (Throwable ex) {
- log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex);
+ log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
+ } finally {
+ this.tserver.walogClosed(currentLog);
}
+ currentLog = null;
+ logSizeEstimate.set(0);
}
- loggers.clear();
- logSizeEstimate.set(0);
} catch (Throwable t) {
throw new IOException(t);
}
@@ -243,7 +294,7 @@ public class TabletServerLogger {
private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
// Work very hard not to lock this during calls to the outside world
- int currentLogSet = logSetId.get();
+ int currentLogId = logId.get();
int seq = -1;
int attempt = 1;
@@ -251,20 +302,22 @@ public class TabletServerLogger {
while (!success) {
try {
// get a reference to the loggers that no other thread can touch
- ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
- currentLogSet = initializeLoggers(copy);
+ DfsLogger copy = null;
+ AtomicInteger currentId = new AtomicInteger(-1);
+ copy = initializeLoggers(currentId);
+ currentLogId = currentId.get();
// add the logger to the log set for the memory in the tablet,
// update the metadata table if we've never used this tablet
- if (currentLogSet == logSetId.get()) {
+ if (currentLogId == logId.get()) {
for (CommitSession commitSession : sessions) {
if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
try {
// Scribble out a tablet definition and then write to the metadata table
defineTablet(commitSession);
- if (currentLogSet == logSetId.get())
- tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
+ if (currentLogId == logId.get())
+ tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent()));
} finally {
commitSession.finishUpdatingLogsUsed();
}
@@ -272,39 +325,29 @@ public class TabletServerLogger {
// Need to release
KeyExtent extent = commitSession.getExtent();
if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
- Set<String> logs = new HashSet<String>();
- for (DfsLogger logger : copy) {
- logs.add(logger.getFileName());
- }
- Status status = StatusUtil.fileCreated(System.currentTimeMillis());
- log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs);
+ Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis());
+ log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName());
// Got some new WALs, note this in the metadata table
- ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status);
+ ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status);
}
}
}
}
// Make sure that the logs haven't changed out from underneath our copy
- if (currentLogSet == logSetId.get()) {
+ if (currentLogId == logId.get()) {
// write the mutation to the logs
seq = seqGen.incrementAndGet();
if (seq < 0)
throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
- ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
- for (DfsLogger wal : copy) {
- LoggerOperation lop = writer.write(wal, seq);
- if (lop != null)
- queuedOperations.add(lop);
- }
-
- for (LoggerOperation lop : queuedOperations) {
+ LoggerOperation lop = writer.write(copy, seq);
+ if (lop != null) {
lop.await();
}
// double-check: did the log set change?
- success = (currentLogSet == logSetId.get());
+ success = (currentLogId == logId.get());
}
} catch (DfsLogger.LogClosedException ex) {
log.debug("Logs closed while writing, retrying " + attempt);
@@ -319,13 +362,13 @@ public class TabletServerLogger {
// Some sort of write failure occurred. Grab the write lock and reset the logs.
// But since multiple threads will attempt it, only attempt the reset when
// the logs haven't changed.
- final int finalCurrent = currentLogSet;
+ final int finalCurrent = currentLogId;
if (!success) {
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
- return finalCurrent == logSetId.get();
+ return finalCurrent == logId.get();
}
@Override
@@ -338,7 +381,7 @@ public class TabletServerLogger {
}
// if the log gets too big, reset it .. grab the write lock first
logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
return logSizeEstimate.get() > maxSize;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index d908f1d..dee705c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.tserver.tablet;
-import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.data.Mutation;
@@ -86,7 +85,7 @@ public class CommitSession {
return committer;
}
- public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+ public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) {
return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index db1b418..ab15ccc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -424,7 +424,9 @@ class DatafileManager {
if (log.isDebugEnabled()) {
log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);
}
- ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
+ for (String logFile : logFileOnly) {
+ ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength());
+ }
}
} finally {
tablet.finishClearingUnusedLogs();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 1f4625b..17864be 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -37,6 +37,7 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -200,7 +201,7 @@ public class Tablet implements TabletCommitter {
}
// stores info about user initiated major compaction that is waiting on a minor compaction to finish
- private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
+ private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
static enum CompactionState {
WAITING_TO_START, IN_PROGRESS
@@ -627,8 +628,8 @@ public class Tablet implements TabletCommitter {
// the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress.
Status status = StatusUtil.openWithUnknownLength();
for (LogEntry logEntry : logEntries) {
- log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
- ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status);
+ log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status));
+ ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status);
}
}
@@ -640,11 +641,9 @@ public class Tablet implements TabletCommitter {
}
}
// make some closed references that represent the recovered logs
- currentLogs = new HashSet<DfsLogger>();
+ currentLogs = new ConcurrentSkipListSet<DfsLogger>();
for (LogEntry logEntry : logEntries) {
- for (String log : logEntry.logSet) {
- currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
- }
+ currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString()));
}
log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries()
@@ -935,7 +934,9 @@ public class Tablet implements TabletCommitter {
long count = 0;
+ String oldName = Thread.currentThread().getName();
try {
+ Thread.currentThread().setName("Minor compacting " + this.extent);
Span span = Trace.start("write");
CompactionStats stats;
try {
@@ -966,6 +967,7 @@ public class Tablet implements TabletCommitter {
failed = true;
throw new RuntimeException(e);
} finally {
+ Thread.currentThread().setName(oldName);
try {
getTabletMemory().finalizeMinC();
} catch (Throwable t) {
@@ -990,7 +992,7 @@ public class Tablet implements TabletCommitter {
private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
CommitSession oldCommitSession = getTabletMemory().prepareForMinC();
otherLogs = currentLogs;
- currentLogs = new HashSet<DfsLogger>();
+ currentLogs = new ConcurrentSkipListSet<DfsLogger>();
FileRef mergeFile = null;
if (mincReason != MinorCompactionReason.RECOVERY) {
@@ -2374,14 +2376,11 @@ public class Tablet implements TabletCommitter {
}
}
- private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
+ private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>();
- public synchronized Set<String> getCurrentLogFiles() {
- Set<String> result = new HashSet<String>();
- for (DfsLogger log : currentLogs) {
- result.add(log.getFileName());
- }
- return result;
+ // currentLogs may be updated while a tablet is otherwise locked
+ public Set<DfsLogger> getCurrentLogFiles() {
+ return new HashSet<DfsLogger>(currentLogs);
}
Set<String> beginClearingUnusedLogs() {
@@ -2440,13 +2439,13 @@ public class Tablet implements TabletCommitter {
// this lock is basically used to synchronize writing of log info to metadata
private final ReentrantLock logLock = new ReentrantLock();
- public synchronized int getLogCount() {
+ public int getLogCount() {
return currentLogs.size();
}
// don't release the lock if this method returns true for success; instead, the caller should clean up by calling finishUpdatingLogsUsed()
@Override
- public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
+ public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
boolean releaseLock = true;
@@ -2483,28 +2482,26 @@ public class Tablet implements TabletCommitter {
int numAdded = 0;
int numContained = 0;
- for (DfsLogger logger : more) {
- if (addToOther) {
- if (otherLogs.add(logger))
- numAdded++;
+ if (addToOther) {
+ if (otherLogs.add(more))
+ numAdded++;
- if (currentLogs.contains(logger))
- numContained++;
- } else {
- if (currentLogs.add(logger))
- numAdded++;
+ if (currentLogs.contains(more))
+ numContained++;
+ } else {
+ if (currentLogs.add(more))
+ numAdded++;
- if (otherLogs.contains(logger))
- numContained++;
- }
+ if (otherLogs.contains(more))
+ numContained++;
}
- if (numAdded > 0 && numAdded != more.size()) {
+ if (numAdded > 0 && numAdded != 1) {
// expect to add all or none
throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs);
}
- if (numContained > 0 && numContained != more.size()) {
+ if (numContained > 0 && numContained != 1) {
// expect to contain all or none
throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index c7e3a66..934ce20 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.tserver.tablet;
-import java.util.Collection;
import java.util.List;
import org.apache.accumulo.core.client.Durability;
@@ -38,7 +37,7 @@ public interface TabletCommitter {
/**
* If this method returns true, the caller must call {@link #finishUpdatingLogsUsed()} to clean up
*/
- boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
+ boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish);
void finishUpdatingLogsUsed();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
new file mode 100644
index 0000000..44058d3
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class LogEntryTest {
+
+ @Test
+ public void test() throws Exception {
+ KeyExtent extent = new KeyExtent(new Text("1"), null, new Text(""));
+ long ts = 12345678L;
+ String server = "localhost:1234";
+ String filename = "default/foo";
+ LogEntry entry = new LogEntry(extent, ts, server, filename);
+ assertEquals(extent, entry.extent);
+ assertEquals(server, entry.server);
+ assertEquals(filename, entry.filename);
+ assertEquals(ts, entry.timestamp);
+ assertEquals("1<; default/foo", entry.toString());
+ assertEquals(new Text("log"), entry.getColumnFamily());
+ assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier());
+ LogEntry copy = LogEntry.fromBytes(entry.toBytes());
+ assertEquals(entry.toString(), copy.toString());
+ Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo"));
+ key.setTimestamp(ts);
+ LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue());
+ assertEquals(entry.toString(), copy2.toString());
+ assertEquals(entry.timestamp, copy2.timestamp);
+ assertEquals("foo", entry.getUniqueID());
+ assertEquals("localhost:1234/default/foo", entry.getName());
+ assertEquals(new Value("default/foo".getBytes()), entry.getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index d0de29f..1186c68 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -202,9 +202,6 @@ public class NullTserver {
}
@Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {}
-
- @Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveCompaction>();
}
@@ -231,6 +228,9 @@ public class NullTserver {
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
return null;
}
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { }
}
static class Opts extends Help {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
index 404a8fd..81e25cc 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -60,6 +60,11 @@ import com.google.common.net.HostAndPort;
public class ProxyDurabilityIT extends ConfigurableMacIT {
@Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
@@ -111,7 +116,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
assertEquals(0, count(tableName));
ConditionalWriterOptions cfg = new ConditionalWriterOptions();
- cfg.setDurability(Durability.LOG);
+ cfg.setDurability(Durability.SYNC);
String cwriter = client.createConditionalWriter(login, tableName, cfg);
ConditionalUpdates updates = new ConditionalUpdates();
updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
@@ -120,7 +125,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
assertEquals(1, count(tableName));
restartTServer();
- assertEquals(0, count(tableName));
+ assertEquals(1, count(tableName));
proxyServer.stop();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
index 25337b2..0dcdf42 100644
--- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
@@ -54,7 +54,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT {
@Override
public int defaultTimeoutSeconds() {
- return 60;
+ return 120;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
index f793925..8703f18 100644
--- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -20,25 +20,33 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.hadoop.io.Text;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class BalanceIT extends ConfigurableMacIT {
+public class BalanceIT extends AccumuloClusterIT {
+ private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
- @Test(timeout = 60 * 1000)
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
public void testBalance() throws Exception {
String tableName = getUniqueNames(1)[0];
Connector c = getConnector();
- System.out.println("Creating table");
+ log.info("Creating table");
c.tableOperations().create(tableName);
SortedSet<Text> splits = new TreeSet<Text>();
for (int i = 0; i < 10; i++) {
splits.add(new Text("" + i));
}
- System.out.println("Adding splits");
+ log.info("Adding splits");
c.tableOperations().addSplits(tableName, splits);
- System.out.println("Waiting for balance");
+ log.info("Waiting for balance");
c.instanceOperations().waitForBalance();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index f553be8..fcad293 100644
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -128,6 +128,7 @@ public class CleanWalIT extends AccumuloClusterIT {
private int countLogs(String tableName, Connector conn) throws TableNotFoundException {
Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
int count = 0;
for (Entry<Key,Value> entry : scanner) {
log.debug("Saw " + entry.getKey() + "=" + entry.getValue());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
index b7637a6..65be396 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1294,6 +1294,7 @@ public class ConditionalWriterIT extends AccumuloClusterIT {
conn.tableOperations().create(tableName);
DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
+ UtilWaitThread.sleep(1000);
Span root = Trace.on("traceTest");
ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
new file mode 100644
index 0000000..0324e4a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GarbageCollectWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // not yet, please
+ String tableName = getUniqueNames(1)[0];
+ cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ // count the number of WALs in the filesystem
+ assertEquals(2, countWALsInFS(cluster));
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ // let GC run
+ UtilWaitThread.sleep(3 * 5 * 1000);
+ assertEquals(2, countWALsInFS(cluster));
+ }
+
+ private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
+ int result = 0;
+ while (iterator.hasNext()) {
+ LocatedFileStatus next = iterator.next();
+ if (!next.isDirectory()) {
+ result++;
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index b78a311..27f1f69 100644
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
-import java.util.Collections;
import java.util.UUID;
import org.apache.accumulo.core.client.BatchWriter;
@@ -27,6 +26,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -127,11 +127,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
String tableId = conn.tableOperations().tableIdMap().get(tableName);
Assert.assertNotNull("Table ID was null", tableId);
- LogEntry logEntry = new LogEntry();
- logEntry.server = "127.0.0.1:12345";
- logEntry.filename = emptyWalog.toURI().toString();
- logEntry.tabletId = 10;
- logEntry.logSet = Collections.singleton(logEntry.filename);
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString());
log.info("Taking {} offline", tableName);
conn.tableOperations().offline(tableName, true);
@@ -186,11 +182,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
String tableId = conn.tableOperations().tableIdMap().get(tableName);
Assert.assertNotNull("Table ID was null", tableId);
- LogEntry logEntry = new LogEntry();
- logEntry.server = "127.0.0.1:12345";
- logEntry.filename = partialHeaderWalog.toURI().toString();
- logEntry.tabletId = 10;
- logEntry.logSet = Collections.singleton(logEntry.filename);
+ LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString());
log.info("Taking {} offline", tableName);
conn.tableOperations().offline(tableName, true);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
deleted file mode 100644
index 6a9975c..0000000
--- a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.cluster.ClusterControl;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.FunctionalTestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// Verify that a recovery of a log without any mutations removes the log reference
-public class NoMutationRecoveryIT extends AccumuloClusterIT {
- private static final Logger log = LoggerFactory.getLogger(NoMutationRecoveryIT.class);
-
- @Override
- public int defaultTimeoutSeconds() {
- return 10 * 60;
- }
-
- @Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Before
- public void takeTraceTableOffline() throws Exception {
- Connector conn = getConnector();
- if (conn.tableOperations().exists("trace")) {
- conn.tableOperations().offline("trace", true);
- }
- }
-
- @After
- public void takeTraceTableOnline() throws Exception {
- Connector conn = getConnector();
- if (conn.tableOperations().exists("trace")) {
- conn.tableOperations().online("trace", true);
- }
- }
-
- public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) {
- // comparison, without timestamp
- Key akey = a.getKey();
- Key bkey = b.getKey();
- log.info("Comparing {} to {}", akey.toStringNoTruncate(), bkey.toStringNoTruncate());
- return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue());
- }
-
- @Test
- public void test() throws Exception {
- Connector conn = getConnector();
- final String table = getUniqueNames(1)[0];
- conn.tableOperations().create(table);
- String tableId = conn.tableOperations().tableIdMap().get(table);
-
- log.info("Created {} with id {}", table, tableId);
-
- // Add a record to the table
- update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes()));
-
- // Get the WAL reference used by the table we just added the update to
- Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME);
-
- log.info("Log reference in metadata table {} {}", logRef.getKey().toStringNoTruncate(), logRef.getValue());
-
- // Flush the record to disk
- conn.tableOperations().flush(table, null, null, true);
-
- Range range = Range.prefix(tableId);
- log.info("Fetching WAL references over " + table);
- assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, range)));
-
- // Grant permission to the admin user to write to the Metadata table
- conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
-
- // Add the wal record back to the metadata table
- update(conn, MetadataTable.NAME, logRef);
-
- // Assert that we can get the bogus update back out again
- assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME)));
-
- conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
- conn.tableOperations().flush(RootTable.NAME, null, null, true);
-
- ClusterControl control = cluster.getClusterControl();
- control.stopAllServers(ServerType.TABLET_SERVER);
- control.startAllServers(ServerType.TABLET_SERVER);
-
- // Verify that we can read the original record we wrote
- Scanner s = conn.createScanner(table, Authorizations.EMPTY);
- int count = 0;
- for (Entry<Key,Value> e : s) {
- assertEquals(e.getKey().getRow().toString(), "row");
- assertEquals(e.getKey().getColumnFamily().toString(), "cf");
- assertEquals(e.getKey().getColumnQualifier().toString(), "cq");
- assertEquals(e.getValue().toString(), "value");
- count++;
- }
- assertEquals(1, count);
-
- // Verify that the bogus log reference we wrote it gone
- for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) {
- assertFalse("Unexpected found reference to bogus log entry: " + ref.getKey().toStringNoTruncate() + " " + ref.getValue(), equals(ref, logRef));
- }
- }
-
- private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception {
- Key k = logRef.getKey();
- update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue());
- }
-
- private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception {
- return getLogRefs(conn, table, new Range());
- }
-
- private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception {
- Scanner s = conn.createScanner(table, Authorizations.EMPTY);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
- s.setRange(r);
- return s;
- }
-
- private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception {
- return getLogRefs(conn, table).iterator().next();
- }
-
- private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception {
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
- Mutation m = new Mutation(row);
- m.put(cf, cq, value);
- bw.addMutation(m);
- bw.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 8b37169..5c5b95d 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -349,7 +349,7 @@ public class ShellServerIT extends SharedMiniClusterIT {
ts.exec("config -t " + table2 + " -np", true, "345M", true);
ts.exec("getsplits -t " + table2, true, "row5", true);
ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true);
- ts.exec("onlinetable " + table, true);
+ ts.exec("online " + table, true);
ts.exec("deletetable -f " + table, true);
ts.exec("deletetable -f " + table2, true);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
new file mode 100644
index 0000000..03d783c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// When reviewing the changes for ACCUMULO-3423, kturner suggested
+// "tablets will now have log references that contain no data,
+// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
+// It would be useful to have an IT that will test this situation.
+public class UnusedWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ final long logSize = 1024 * 1024 * 10;
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // don't want this bad boy cleaning up walog entries
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+
+ // make two tables
+ String[] tableNames = getUniqueNames(2);
+ String bigTable = tableNames[0];
+ String lilTable = tableNames[1];
+ Connector c = getConnector();
+ c.tableOperations().create(bigTable);
+ c.tableOperations().create(lilTable);
+
+ // put some data in a log that should be replayed for both tables
+ writeSomeData(c, bigTable, 0, 10, 0, 10);
+ scanSomeData(c, bigTable, 0, 10, 0, 10);
+ writeSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ assertEquals(1, getWALCount(c));
+
+ // roll the logs by pushing data into bigTable
+ writeSomeData(c, bigTable, 0, 3000, 0, 1000);
+ assertEquals(2, getWALCount(c));
+
+ // put some data in the latest log
+ writeSomeData(c, lilTable, 1, 10, 0, 10);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+
+ // bounce the tserver
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ // wait for the metadata table to be online
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+ // check our two sets of data in different logs
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+ }
+
+ private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ Scanner s = c.createScanner(table, Authorizations.EMPTY);
+ s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
+ int row = startRow;
+ int col = startCol;
+ for (Entry<Key,Value> entry : s) {
+ assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
+ assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16));
+ if (col == startCol + colCount) {
+ col = startCol;
+ row++;
+ if (row == startRow + rowCount) {
+ break;
+ }
+ }
+ }
+ assertEquals(row, startRow + rowCount);
+ }
+
+ private int getWALCount(Connector c) throws Exception {
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(CurrentLogsSection.getRange());
+ try {
+ return Iterators.size(s.iterator());
+ } finally {
+ s.close();
+ }
+ }
+
+ private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxMemory(10 * 1024 * 1024);
+ BatchWriter bw = conn.createBatchWriter(table, config);
+ for (int r = startRow; r < startRow + rowCount; r++) {
+ Mutation m = new Mutation(Integer.toHexString(r));
+ for (int c = startCol; c < startCol + colCount; c++) {
+ m.put("", Integer.toHexString(c), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index d9b9429..e2a0e03 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -103,6 +103,7 @@ public class VolumeIT extends ConfigurableMacIT {
cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -425,6 +426,21 @@ public class VolumeIT extends ConfigurableMacIT {
Assert.fail("Unexpected volume " + path);
}
+ Text path = new Text();
+ for (String table : new String[]{RootTable.NAME, MetadataTable.NAME}) {
+ Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
+ meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ outer: for (Entry<Key,Value> entry : meta) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ for (int i = 0; i < paths.length; i++) {
+ if (path.toString().startsWith(paths[i].toString())) {
+ continue outer;
+ }
+ }
+ Assert.fail("Unexpected volume " + path);
+ }
+ }
+
// if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
// 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
@@ -435,6 +451,7 @@ public class VolumeIT extends ConfigurableMacIT {
}
Assert.assertEquals(200, sum);
+
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 099743d..1f3e600 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -72,9 +73,11 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.TestMultiTableIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
@@ -84,6 +87,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
public class ReadWriteIT extends AccumuloClusterIT {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ }
+
private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class);
static final int ROWS = 200000;