You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/12/28 17:31:21 UTC
[accumulo] branch master updated: More cleanup for WAL (#842)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 56b21ca More cleanup for WAL (#842)
56b21ca is described below
commit 56b21ca4395b54e525850437d70e7ec28568d2b3
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Dec 28 12:31:16 2018 -0500
More cleanup for WAL (#842)
* Only synchronize on write method
* Actually log MUTATION for single mutation
* Make TabletMutations constructor take CommitSession
* Simplify calls to defineTablet and logFileData
* Remove unused datafile path param
* Fix slf4j debug statement
---
.../apache/accumulo/tserver/TabletMutations.java | 14 ++---
.../org/apache/accumulo/tserver/TabletServer.java | 15 +++---
.../org/apache/accumulo/tserver/log/DfsLogger.java | 63 +++++++++++-----------
.../accumulo/tserver/log/TabletServerLogger.java | 32 +++--------
.../apache/accumulo/tserver/log/DfsLoggerTest.java | 24 ++++++---
5 files changed, 68 insertions(+), 80 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index 6a65097..32d70fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -20,16 +20,16 @@ import java.util.List;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
public class TabletMutations {
- private final int tid;
- private final long seq;
+ private CommitSession commitSession;
private final List<Mutation> mutations;
private final Durability durability;
- public TabletMutations(int tid, long seq, List<Mutation> mutations, Durability durability) {
- this.tid = tid;
- this.seq = seq;
+ public TabletMutations(CommitSession commitSession, List<Mutation> mutations,
+ Durability durability) {
+ this.commitSession = commitSession;
this.mutations = mutations;
this.durability = durability;
}
@@ -39,11 +39,11 @@ public class TabletMutations {
}
public int getTid() {
- return tid;
+ return commitSession.getLogId();
}
public long getSeq() {
- return seq;
+ return commitSession.getWALogSeq();
}
public Durability getDurability() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ee1d455..08c5495 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1045,8 +1045,8 @@ public class TabletServer implements Runnable {
us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
} else {
if (durability != Durability.NONE) {
- loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
- commitSession.getWALogSeq(), mutations, durability));
+ loggables.put(commitSession,
+ new TabletMutations(commitSession, mutations, durability));
}
sendables.put(commitSession, mutations);
mutationCount += mutations.size();
@@ -1063,8 +1063,7 @@ public class TabletServer implements Runnable {
// prepareMutationsForCommit() expects
CommitSession cs = e.getCommitSession();
if (durability != Durability.NONE) {
- loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
- e.getNonViolators(), durability));
+ loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
}
sendables.put(cs, e.getNonViolators());
}
@@ -1272,7 +1271,7 @@ public class TabletServer implements Runnable {
try {
final Span wal = Trace.start("wal");
try {
- logger.log(cs, cs.getWALogSeq(), mutation, durability);
+ logger.log(cs, mutation, durability);
} finally {
wal.stop();
}
@@ -1390,8 +1389,7 @@ public class TabletServer implements Runnable {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
if (durability != Durability.NONE) {
- loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
- mutations, durability));
+ loggables.put(cs, new TabletMutations(cs, mutations, durability));
}
sendables.put(cs, mutations);
}
@@ -1400,8 +1398,7 @@ public class TabletServer implements Runnable {
CommitSession cs = e.getCommitSession();
if (e.getNonViolators().size() > 0) {
if (durability != Durability.NONE) {
- loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
- e.getNonViolators(), durability));
+ loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
}
sendables.put(cs, e.getNonViolators());
for (Mutation m : e.getNonViolators())
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 5f9c0a8..721b0d8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -17,10 +17,12 @@
package org.apache.accumulo.tserver.log;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
import java.io.DataInputStream;
@@ -32,7 +34,6 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +50,6 @@ import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl;
import org.apache.accumulo.core.cryptoImpl.NoCryptoService;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
import org.apache.accumulo.core.spi.crypto.CryptoService;
@@ -66,6 +66,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -450,7 +451,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
key.event = OPEN;
key.tserverSession = filename;
key.filename = filename;
- op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
+ op = logKeyData(key, Durability.SYNC);
} catch (Exception ex) {
if (logFile != null)
logFile.close();
@@ -537,22 +538,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
}
}
- public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException {
+ public LoggerOperation defineTablet(CommitSession cs) throws IOException {
// write this log to the METADATA table
final LogFileKey key = new LogFileKey();
key.event = DEFINE_TABLET;
- key.seq = seq;
- key.tabletId = tid;
- key.tablet = tablet;
- try {
- write(key, EMPTY);
- } catch (ClosedChannelException ex) {
- throw new LogClosedException();
- } catch (IllegalArgumentException e) {
- log.error("Signature of sync method changed. Accumulo is likely"
- + " incompatible with this version of Hadoop.");
- throw new RuntimeException(e);
- }
+ key.seq = cs.getWALogSeq();
+ key.tabletId = cs.getLogId();
+ key.tablet = cs.getExtent();
+ return logKeyData(key, Durability.LOG);
}
private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
@@ -561,26 +554,22 @@ public class DfsLogger implements Comparable<DfsLogger> {
encryptingLogFile.flush();
}
- public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability)
- throws IOException {
- return logManyTablets(Collections.singletonList(
- new TabletMutations(tid, seq, Collections.singletonList(mutation), durability)));
+ private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException {
+ return logFileData(singletonList(new Pair<>(key, EMPTY)), d);
}
private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
Durability durability) throws IOException {
DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
- synchronized (DfsLogger.this) {
- try {
- for (Pair<LogFileKey,LogFileValue> pair : keys) {
- write(pair.getFirst(), pair.getSecond());
- }
- } catch (ClosedChannelException ex) {
- throw new LogClosedException();
- } catch (Exception e) {
- log.error("Failed to write log entries", e);
- work.exception = e;
+ try {
+ for (Pair<LogFileKey,LogFileValue> pair : keys) {
+ write(pair.getFirst(), pair.getSecond());
}
+ } catch (ClosedChannelException ex) {
+ throw new LogClosedException();
+ } catch (Exception e) {
+ log.error("Failed to write log entries", e);
+ work.exception = e;
}
if (durability == Durability.LOG)
@@ -614,6 +603,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
return logFileData(data, durability);
}
+ public LoggerOperation log(CommitSession cs, Mutation m, Durability d) throws IOException {
+ LogFileKey key = new LogFileKey();
+ key.event = MUTATION;
+ key.seq = cs.getWALogSeq();
+ key.tabletId = cs.getLogId();
+ LogFileValue value = new LogFileValue();
+ value.mutations = singletonList(m);
+ return logFileData(singletonList(new Pair<>(key, value)), d);
+ }
+
/**
* Return the Durability with the highest precedence
*/
@@ -631,7 +630,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
key.event = COMPACTION_FINISH;
key.seq = seq;
key.tabletId = tid;
- return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+ return logKeyData(key, durability);
}
public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn,
@@ -641,7 +640,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
key.seq = seq;
key.tabletId = tid;
key.filename = fqfn;
- return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+ return logKeyData(key, durability);
}
public String getLogger() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 6aac02e..5907902 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -275,7 +275,7 @@ public class TabletServerLogger {
alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
String fileName = alog.getFileName();
- log.debug("Created next WAL " + fileName);
+ log.debug("Created next WAL {}", fileName);
tserver.addNewLogMarker(alog);
while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
log.info("Our WAL was not used for 12 hours: {}", fileName);
@@ -348,9 +348,8 @@ public class TabletServerLogger {
while (!success) {
try {
// get a reference to the loggers that no other thread can touch
- DfsLogger copy = null;
AtomicInteger currentId = new AtomicInteger(-1);
- copy = initializeLoggers(currentId);
+ DfsLogger copy = initializeLoggers(currentId);
currentLogId = currentId.get();
// add the logger to the log set for the memory in the tablet,
@@ -361,7 +360,8 @@ public class TabletServerLogger {
if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
try {
// Scribble out a tablet definition and then write to the metadata table
- defineTablet(commitSession, writeRetry);
+ write(singletonList(commitSession), false,
+ logger -> logger.defineTablet(commitSession), writeRetry);
} finally {
commitSession.finishUpdatingLogsUsed();
}
@@ -441,26 +441,15 @@ public class TabletServerLogger {
});
}
- public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
- throws IOException {
- // scribble this into the metadata tablet, too.
- write(singletonList(commitSession), false, logger -> {
- logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
- commitSession.getExtent());
- return DfsLogger.NO_WAIT_LOGGER_OP;
- }, writeRetry);
- }
-
/**
* Log a single mutation. This method expects mutations that have a durability other than NONE.
*/
- public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m,
- final Durability durability) throws IOException {
+ public void log(final CommitSession commitSession, final Mutation m, final Durability durability)
+ throws IOException {
if (durability == Durability.DEFAULT || durability == Durability.NONE) {
throw new IllegalArgumentException("Unexpected durability " + durability);
}
- write(singletonList(commitSession), false,
- logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+ write(singletonList(commitSession), false, logger -> logger.log(commitSession, m, durability),
writeRetryFactory.createRetry());
logSizeEstimate.addAndGet(m.numBytes());
}
@@ -486,16 +475,9 @@ public class TabletServerLogger {
public void minorCompactionFinished(final CommitSession commitSession, final long walogSeq,
final Durability durability) throws IOException {
-
- long t1 = System.currentTimeMillis();
-
write(singletonList(commitSession), true,
logger -> logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), durability),
writeRetryFactory.createRetry());
-
- long t2 = System.currentTimeMillis();
-
- log.debug(" wrote MinC finish: writeTime:{}ms durability:{}", (t2 - t1), durability);
}
public long minorCompactionStarted(final CommitSession commitSession, final long seq,
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 80a6ff5..1073e46 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.easymock.EasyMock;
import org.junit.Test;
public class DfsLoggerTest {
@@ -32,26 +34,34 @@ public class DfsLoggerTest {
@Test
public void testDurabilityForGroupCommit() {
List<TabletMutations> lst = new ArrayList<>();
+ CommitSession commitSession = EasyMock.createMock(CommitSession.class);
assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
- TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+ TabletMutations m1 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.NONE);
lst.add(m1);
assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
- TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+ TabletMutations m2 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.LOG);
lst.add(m2);
assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
- TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+ TabletMutations m3 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.NONE);
lst.add(m3);
assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
- TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+ TabletMutations m4 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.FLUSH);
lst.add(m4);
assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
- TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+ TabletMutations m5 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.LOG);
lst.add(m5);
assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
- TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
+ TabletMutations m6 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.SYNC);
lst.add(m6);
assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
- TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+ TabletMutations m7 = new TabletMutations(commitSession, Collections.emptyList(),
+ Durability.FLUSH);
lst.add(m7);
assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
}