You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/12/28 17:31:18 UTC

[GitHub] milleruntime closed pull request #842: More cleanup for WAL

milleruntime closed pull request #842: More cleanup for WAL
URL: https://github.com/apache/accumulo/pull/842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index 6a65097927..32d70fec2f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -20,16 +20,16 @@
 
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
 
 public class TabletMutations {
-  private final int tid;
-  private final long seq;
+  private CommitSession commitSession;
   private final List<Mutation> mutations;
   private final Durability durability;
 
-  public TabletMutations(int tid, long seq, List<Mutation> mutations, Durability durability) {
-    this.tid = tid;
-    this.seq = seq;
+  public TabletMutations(CommitSession commitSession, List<Mutation> mutations,
+      Durability durability) {
+    this.commitSession = commitSession;
     this.mutations = mutations;
     this.durability = durability;
   }
@@ -39,11 +39,11 @@ public TabletMutations(int tid, long seq, List<Mutation> mutations, Durability d
   }
 
   public int getTid() {
-    return tid;
+    return commitSession.getLogId();
   }
 
   public long getSeq() {
-    return seq;
+    return commitSession.getWALogSeq();
   }
 
   public Durability getDurability() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ee1d455ccb..08c54951cb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1045,8 +1045,8 @@ private void flush(UpdateSession us) {
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
                 if (durability != Durability.NONE) {
-                  loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
-                      commitSession.getWALogSeq(), mutations, durability));
+                  loggables.put(commitSession,
+                      new TabletMutations(commitSession, mutations, durability));
                 }
                 sendables.put(commitSession, mutations);
                 mutationCount += mutations.size();
@@ -1063,8 +1063,7 @@ private void flush(UpdateSession us) {
                 // prepareMutationsForCommit() expects
                 CommitSession cs = e.getCommitSession();
                 if (durability != Durability.NONE) {
-                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                      e.getNonViolators(), durability));
+                  loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
                 }
                 sendables.put(cs, e.getNonViolators());
               }
@@ -1272,7 +1271,7 @@ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation, durability);
+              logger.log(cs, mutation, durability);
             } finally {
               wal.stop();
             }
@@ -1390,8 +1389,7 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
                   if (durability != Durability.NONE) {
-                    loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                        mutations, durability));
+                    loggables.put(cs, new TabletMutations(cs, mutations, durability));
                   }
                   sendables.put(cs, mutations);
                 }
@@ -1400,8 +1398,7 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
               CommitSession cs = e.getCommitSession();
               if (e.getNonViolators().size() > 0) {
                 if (durability != Durability.NONE) {
-                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-                      e.getNonViolators(), durability));
+                  loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
                 }
                 sendables.put(cs, e.getNonViolators());
                 for (Mutation m : e.getNonViolators())
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 5f9c0a814e..721b0d8313 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -17,10 +17,12 @@
 package org.apache.accumulo.tserver.log;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
 import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
@@ -32,7 +34,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -49,7 +50,6 @@
 import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.cryptoImpl.NoCryptoService;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
@@ -66,6 +66,7 @@
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -450,7 +451,7 @@ public synchronized void open(String address) throws IOException {
       key.event = OPEN;
       key.tserverSession = filename;
       key.filename = filename;
-      op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
+      op = logKeyData(key, Durability.SYNC);
     } catch (Exception ex) {
       if (logFile != null)
         logFile.close();
@@ -537,22 +538,14 @@ public void close() throws IOException {
       }
   }
 
-  public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException {
+  public LoggerOperation defineTablet(CommitSession cs) throws IOException {
     // write this log to the METADATA table
     final LogFileKey key = new LogFileKey();
     key.event = DEFINE_TABLET;
-    key.seq = seq;
-    key.tabletId = tid;
-    key.tablet = tablet;
-    try {
-      write(key, EMPTY);
-    } catch (ClosedChannelException ex) {
-      throw new LogClosedException();
-    } catch (IllegalArgumentException e) {
-      log.error("Signature of sync method changed. Accumulo is likely"
-          + " incompatible with this version of Hadoop.");
-      throw new RuntimeException(e);
-    }
+    key.seq = cs.getWALogSeq();
+    key.tabletId = cs.getLogId();
+    key.tablet = cs.getExtent();
+    return logKeyData(key, Durability.LOG);
   }
 
   private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
@@ -561,26 +554,22 @@ private synchronized void write(LogFileKey key, LogFileValue value) throws IOExc
     encryptingLogFile.flush();
   }
 
-  public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability)
-      throws IOException {
-    return logManyTablets(Collections.singletonList(
-        new TabletMutations(tid, seq, Collections.singletonList(mutation), durability)));
+  private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException {
+    return logFileData(singletonList(new Pair<>(key, EMPTY)), d);
   }
 
   private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
       Durability durability) throws IOException {
     DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
-    synchronized (DfsLogger.this) {
-      try {
-        for (Pair<LogFileKey,LogFileValue> pair : keys) {
-          write(pair.getFirst(), pair.getSecond());
-        }
-      } catch (ClosedChannelException ex) {
-        throw new LogClosedException();
-      } catch (Exception e) {
-        log.error("Failed to write log entries", e);
-        work.exception = e;
+    try {
+      for (Pair<LogFileKey,LogFileValue> pair : keys) {
+        write(pair.getFirst(), pair.getSecond());
       }
+    } catch (ClosedChannelException ex) {
+      throw new LogClosedException();
+    } catch (Exception e) {
+      log.error("Failed to write log entries", e);
+      work.exception = e;
     }
 
     if (durability == Durability.LOG)
@@ -614,6 +603,16 @@ public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) thr
     return logFileData(data, durability);
   }
 
+  public LoggerOperation log(CommitSession cs, Mutation m, Durability d) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = MUTATION;
+    key.seq = cs.getWALogSeq();
+    key.tabletId = cs.getLogId();
+    LogFileValue value = new LogFileValue();
+    value.mutations = singletonList(m);
+    return logFileData(singletonList(new Pair<>(key, value)), d);
+  }
+
   /**
    * Return the Durability with the highest precedence
    */
@@ -631,7 +630,7 @@ public LoggerOperation minorCompactionFinished(long seq, int tid, Durability dur
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tabletId = tid;
-    return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+    return logKeyData(key, durability);
   }
 
   public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn,
@@ -641,7 +640,7 @@ public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn,
     key.seq = seq;
     key.tabletId = tid;
     key.filename = fqfn;
-    return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
+    return logKeyData(key, durability);
   }
 
   public String getLogger() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 6aac02e499..5907902a9a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -275,7 +275,7 @@ public void run() {
             alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter);
             alog.open(tserver.getClientAddressString());
             String fileName = alog.getFileName();
-            log.debug("Created next WAL " + fileName);
+            log.debug("Created next WAL {}", fileName);
             tserver.addNewLogMarker(alog);
             while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
               log.info("Our WAL was not used for 12 hours: {}", fileName);
@@ -348,9 +348,8 @@ private void write(final Collection<CommitSession> sessions, boolean mincFinish,
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        DfsLogger copy = null;
         AtomicInteger currentId = new AtomicInteger(-1);
-        copy = initializeLoggers(currentId);
+        DfsLogger copy = initializeLoggers(currentId);
         currentLogId = currentId.get();
 
         // add the logger to the log set for the memory in the tablet,
@@ -361,7 +360,8 @@ private void write(final Collection<CommitSession> sessions, boolean mincFinish,
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
-                defineTablet(commitSession, writeRetry);
+                write(singletonList(commitSession), false,
+                    logger -> logger.defineTablet(commitSession), writeRetry);
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }
@@ -441,26 +441,15 @@ void withWriteLock() throws IOException {
     });
   }
 
-  public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
-      throws IOException {
-    // scribble this into the metadata tablet, too.
-    write(singletonList(commitSession), false, logger -> {
-      logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
-          commitSession.getExtent());
-      return DfsLogger.NO_WAIT_LOGGER_OP;
-    }, writeRetry);
-  }
-
   /**
    * Log a single mutation. This method expects mutations that have a durability other than NONE.
    */
-  public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m,
-      final Durability durability) throws IOException {
+  public void log(final CommitSession commitSession, final Mutation m, final Durability durability)
+      throws IOException {
     if (durability == Durability.DEFAULT || durability == Durability.NONE) {
       throw new IllegalArgumentException("Unexpected durability " + durability);
     }
-    write(singletonList(commitSession), false,
-        logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+    write(singletonList(commitSession), false, logger -> logger.log(commitSession, m, durability),
         writeRetryFactory.createRetry());
     logSizeEstimate.addAndGet(m.numBytes());
   }
@@ -486,16 +475,9 @@ public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws
 
   public void minorCompactionFinished(final CommitSession commitSession, final long walogSeq,
       final Durability durability) throws IOException {
-
-    long t1 = System.currentTimeMillis();
-
     write(singletonList(commitSession), true,
         logger -> logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), durability),
         writeRetryFactory.createRetry());
-
-    long t2 = System.currentTimeMillis();
-
-    log.debug(" wrote MinC finish: writeTime:{}ms  durability:{}", (t2 - t1), durability);
   }
 
   public long minorCompactionStarted(final CommitSession commitSession, final long seq,
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 80a6ff5650..1073e46b9d 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -25,6 +25,8 @@
 
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class DfsLoggerTest {
@@ -32,26 +34,34 @@
   @Test
   public void testDurabilityForGroupCommit() {
     List<TabletMutations> lst = new ArrayList<>();
+    CommitSession commitSession = EasyMock.createMock(CommitSession.class);
     assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+    TabletMutations m1 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.NONE);
     lst.add(m1);
     assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+    TabletMutations m2 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.LOG);
     lst.add(m2);
     assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
+    TabletMutations m3 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.NONE);
     lst.add(m3);
     assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+    TabletMutations m4 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.FLUSH);
     lst.add(m4);
     assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
+    TabletMutations m5 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.LOG);
     lst.add(m5);
     assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
+    TabletMutations m6 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.SYNC);
     lst.add(m6);
     assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
-    TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
+    TabletMutations m7 = new TabletMutations(commitSession, Collections.emptyList(),
+        Durability.FLUSH);
     lst.add(m7);
     assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services