You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/12/28 17:31:21 UTC

[accumulo] branch master updated: More cleanup for WAL (#842)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 56b21ca  More cleanup for WAL (#842)
56b21ca is described below

commit 56b21ca4395b54e525850437d70e7ec28568d2b3
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Dec 28 12:31:16 2018 -0500

    More cleanup for WAL (#842)
    
    * Only synchronize on write method
    * Actually log MUTATION for single mutation
    * Make TabletMutations constructor take CommitSession
    * Simplify calls to defineTablet and logFileData
    * Remove unused datafile path param
    * Fix slf4j debug statement
---
 .../apache/accumulo/tserver/TabletMutations.java   | 14 ++---
 .../org/apache/accumulo/tserver/TabletServer.java  | 15 +++---
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 63 +++++++++++-----------
 .../accumulo/tserver/log/TabletServerLogger.java   | 32 +++--------
 .../apache/accumulo/tserver/log/DfsLoggerTest.java | 24 ++++++---
 5 files changed, 68 insertions(+), 80 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index 6a65097..32d70fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -20,16 +20,16 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
 
 public class TabletMutations {
-  private final int tid;
-  private final long seq;
+  private CommitSession commitSession;
   private final List<Mutation> mutations;
   private final Durability durability;
 
-  public TabletMutations(int tid, long seq, List<Mutation> mutations, Durability durability) {
-    this.tid = tid;
-    this.seq = seq;
+  public TabletMutations(CommitSession commitSession, List<Mutation> mutations,
+      Durability durability) {
+    this.commitSession = commitSession;
     this.mutations = mutations;
     this.durability = durability;
   }
@@ -39,11 +39,11 @@ public class TabletMutations {
   }
 
   public int getTid() {
-    return tid;
+    return commitSession.getLogId();
   }
 
   public long getSeq() {
-    return seq;
+    return commitSession.getWALogSeq();
   }
 
   public Durability getDurability() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ee1d455..08c5495 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1045,8 +1045,8 @@ public class TabletServer implements Runnable {
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
                 if (durability != Durability.NONE) {
-                  loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
-                      commitSession.getWALogSeq(), mutations, durability));
+                  loggables.put(commitSession,
+                      new TabletMutations(commitSession, mutations, durability));
                 }
                 sendables.put(commitSession, mutations);
                 mutationCount += mutations.size();
@@ -1063,8 +1063,7 @@ public class TabletServer implements Runnable {
                 // prepareMutationsForCommit() expects
                 CommitSession cs = e.getCommitSession();
                 if (durability != Durability.NONE) {
-                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                      e.getNonViolators(), durability));
+                  loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
                 }
                 sendables.put(cs, e.getNonViolators());
               }
@@ -1272,7 +1271,7 @@ public class TabletServer implements Runnable {
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation, durability);
+              logger.log(cs, mutation, durability);
             } finally {
               wal.stop();
             }
@@ -1390,8 +1389,7 @@ public class TabletServer implements Runnable {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
                   if (durability != Durability.NONE) {
-                    loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                        mutations, durability));
+                    loggables.put(cs, new TabletMutations(cs, mutations, durability));
                   }
                   sendables.put(cs, mutations);
                 }
@@ -1400,8 +1398,7 @@ public class TabletServer implements Runnable {
               CommitSession cs = e.getCommitSession();
               if (e.getNonViolators().size() > 0) {
                 if (durability != Durability.NONE) {
-                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                      e.getNonViolators(), durability));
+                  loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
                 }
                 sendables.put(cs, e.getNonViolators());
                 for (Mutation m : e.getNonViolators())
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 5f9c0a8..721b0d8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -17,10 +17,12 @@
 package org.apache.accumulo.tserver.log;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
 import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
@@ -32,7 +34,6 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -49,7 +50,6 @@ import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
 import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.cryptoImpl.NoCryptoService;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
@@ -66,6 +66,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -450,7 +451,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
       key.event = OPEN;
       key.tserverSession = filename;
       key.filename = filename;
-      op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
+      op = logKeyData(key, Durability.SYNC);
     } catch (Exception ex) {
       if (logFile != null)
         logFile.close();
@@ -537,22 +538,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
       }
   }
 
-  public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException {
+  public LoggerOperation defineTablet(CommitSession cs) throws IOException {
     // write this log to the METADATA table
     final LogFileKey key = new LogFileKey();
     key.event = DEFINE_TABLET;
-    key.seq = seq;
-    key.tabletId = tid;
-    key.tablet = tablet;
-    try {
-      write(key, EMPTY);
-    } catch (ClosedChannelException ex) {
-      throw new LogClosedException();
-    } catch (IllegalArgumentException e) {
-      log.error("Signature of sync method changed. Accumulo is likely"
-          + " incompatible with this version of Hadoop.");
-      throw new RuntimeException(e);
-    }
+    key.seq = cs.getWALogSeq();
+    key.tabletId = cs.getLogId();
+    key.tablet = cs.getExtent();
+    return logKeyData(key, Durability.LOG);
   }
 
   private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
@@ -561,26 +554,22 @@ public class DfsLogger implements Comparable<DfsLogger> {
     encryptingLogFile.flush();
   }
 
-  public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability)
-      throws IOException {
-    return logManyTablets(Collections.singletonList(
-        new TabletMutations(tid, seq, Collections.singletonList(mutation), durability)));
+  private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException {
+    return logFileData(singletonList(new Pair<>(key, EMPTY)), d);
   }
 
   private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
       Durability durability) throws IOException {
     DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
-    synchronized (DfsLogger.this) {
-      try {
-        for (Pair<LogFileKey,LogFileValue> pair : keys) {
-          write(pair.getFirst(), pair.getSecond());
-        }
-      } catch (ClosedChannelException ex) {
-        throw new LogClosedException();
-      } catch (Exception e) {
-        log.error("Failed to write log entries", e);
-        work.exception = e;
+    try {
+      for (Pair<LogFileKey,LogFileValue> pair : keys) {
+        write(pair.getFirst(), pair.getSecond());
       }
+    } catch (ClosedChannelException ex) {
+      throw new LogClosedException();
+    } catch (Exception e) {
+      log.error("Failed to write log entries", e);
+      work.exception = e;
     }
 
     if (durability == Durability.LOG)
@@ -614,6 +603,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
     return logFileData(data, durability);
   }
 
+  public LoggerOperation log(CommitSession cs, Mutation m, Durability d) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = MUTATION;
+    key.seq = cs.getWALogSeq();
+    key.tabletId = cs.getLogId();
+    LogFileValue value = new LogFileValue();
+    value.mutations = singletonList(m);
+    return logFileData(singletonList(new Pair<>(key, value)), d);
+  }
+
   /**
    * Return the Durability with the highest precedence
    */
@@ -631,7 +630,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tabletId = tid;
-    return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+    return logKeyData(key, durability);
   }
 
   public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn,
@@ -641,7 +640,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
     key.seq = seq;
     key.tabletId = tid;
     key.filename = fqfn;
-    return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+    return logKeyData(key, durability);
   }
 
   public String getLogger() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 6aac02e..5907902 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -275,7 +275,7 @@ public class TabletServerLogger {
             alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter);
             alog.open(tserver.getClientAddressString());
             String fileName = alog.getFileName();
-            log.debug("Created next WAL " + fileName);
+            log.debug("Created next WAL {}", fileName);
             tserver.addNewLogMarker(alog);
             while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
               log.info("Our WAL was not used for 12 hours: {}", fileName);
@@ -348,9 +348,8 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        DfsLogger copy = null;
         AtomicInteger currentId = new AtomicInteger(-1);
-        copy = initializeLoggers(currentId);
+        DfsLogger copy = initializeLoggers(currentId);
         currentLogId = currentId.get();
 
         // add the logger to the log set for the memory in the tablet,
@@ -361,7 +360,8 @@ public class TabletServerLogger {
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
-                defineTablet(commitSession, writeRetry);
+                write(singletonList(commitSession), false,
+                    logger -> logger.defineTablet(commitSession), writeRetry);
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }
@@ -441,26 +441,15 @@ public class TabletServerLogger {
     });
   }
 
-  public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
-      throws IOException {
-    // scribble this into the metadata tablet, too.
-    write(singletonList(commitSession), false, logger -> {
-      logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
-          commitSession.getExtent());
-      return DfsLogger.NO_WAIT_LOGGER_OP;
-    }, writeRetry);
-  }
-
   /**
    * Log a single mutation. This method expects mutations that have a durability other than NONE.
    */
-  public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m,
-      final Durability durability) throws IOException {
+  public void log(final CommitSession commitSession, final Mutation m, final Durability durability)
+      throws IOException {
     if (durability == Durability.DEFAULT || durability == Durability.NONE) {
       throw new IllegalArgumentException("Unexpected durability " + durability);
     }
-    write(singletonList(commitSession), false,
-        logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+    write(singletonList(commitSession), false, logger -> logger.log(commitSession, m, durability),
         writeRetryFactory.createRetry());
     logSizeEstimate.addAndGet(m.numBytes());
   }
@@ -486,16 +475,9 @@ public class TabletServerLogger {
 
   public void minorCompactionFinished(final CommitSession commitSession, final long walogSeq,
       final Durability durability) throws IOException {
-
-    long t1 = System.currentTimeMillis();
-
     write(singletonList(commitSession), true,
         logger -> logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), durability),
         writeRetryFactory.createRetry());
-
-    long t2 = System.currentTimeMillis();
-
-    log.debug(" wrote MinC finish: writeTime:{}ms  durability:{}", (t2 - t1), durability);
   }
 
   public long minorCompactionStarted(final CommitSession commitSession, final long seq,
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 80a6ff5..1073e46 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class DfsLoggerTest {
@@ -32,26 +34,34 @@ public class DfsLoggerTest {
   @Test
   public void testDurabilityForGroupCommit() {
     List<TabletMutations> lst = new ArrayList<>();
+    CommitSession commitSession = EasyMock.createMock(CommitSession.class);
     assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+    TabletMutations m1 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.NONE);
     lst.add(m1);
     assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+    TabletMutations m2 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.LOG);
     lst.add(m2);
     assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+    TabletMutations m3 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.NONE);
     lst.add(m3);
     assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+    TabletMutations m4 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.FLUSH);
     lst.add(m4);
     assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+    TabletMutations m5 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.LOG);
     lst.add(m5);
     assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
+    TabletMutations m6 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.SYNC);
     lst.add(m6);
     assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+    TabletMutations m7 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.FLUSH);
     lst.add(m7);
     assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
   }