You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/12/12 22:57:51 UTC

[GitHub] milleruntime closed pull request #793: Cleanup TabletServerLogger

milleruntime closed pull request #793: Cleanup TabletServerLogger
URL: https://github.com/apache/accumulo/pull/793
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
deleted file mode 100644
index 76061e619e..0000000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.util.List;
-
-import org.apache.accumulo.core.client.Durability;
-import org.apache.accumulo.core.data.Mutation;
-
-public class Mutations {
-  private final Durability durability;
-  private final List<Mutation> mutations;
-
-  Mutations(Durability durability, List<Mutation> mutations) {
-    this.durability = durability;
-    this.mutations = mutations;
-  }
-
-  public Durability getDurability() {
-    return durability;
-  }
-
-  public List<Mutation> getMutations() {
-    return mutations;
-  }
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c8651d745f..e99d138352 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1019,7 +1019,8 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
     private void flush(UpdateSession us) {
 
       int mutationCount = 0;
-      Map<CommitSession,Mutations> sendables = new HashMap<>();
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
       Throwable error = null;
 
       long pt1 = System.currentTimeMillis();
@@ -1037,7 +1038,8 @@ private void flush(UpdateSession us) {
         for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
 
           Tablet tablet = entry.getKey();
-          Durability tabletDurability = tablet.getDurability();
+          Durability durability = DurabilityImpl.resolveDurabilty(us.durability,
+              tablet.getDurability());
           List<Mutation> mutations = entry.getValue();
           if (mutations.size() > 0) {
             try {
@@ -1051,8 +1053,11 @@ private void flush(UpdateSession us) {
                 }
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
-                sendables.put(commitSession, new Mutations(
-                    DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
+                if (durability != Durability.NONE) {
+                  loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
+                      commitSession.getWALogSeq(), mutations, durability));
+                }
+                sendables.put(commitSession, mutations);
                 mutationCount += mutations.size();
               }
 
@@ -1065,9 +1070,12 @@ private void flush(UpdateSession us) {
                 // only log and commit mutations if there were some
                 // that did not violate constraints... this is what
                 // prepareMutationsForCommit() expects
-                sendables.put(e.getCommitSession(),
-                    new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability),
-                        e.getNonViolators()));
+                CommitSession cs = e.getCommitSession();
+                if (durability != Durability.NONE) {
+                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                      e.getNonViolators(), durability));
+                }
+                sendables.put(cs, e.getNonViolators());
               }
 
               mutationCount += mutations.size();
@@ -1088,9 +1096,7 @@ private void flush(UpdateSession us) {
       updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
 
       if (error != null) {
-        for (Entry<CommitSession,Mutations> e : sendables.entrySet()) {
-          e.getKey().abortCommit(e.getValue().getMutations());
-        }
+        sendables.forEach(CommitSession::abortCommit);
         throw new RuntimeException(error);
       }
       try {
@@ -1100,7 +1106,7 @@ private void flush(UpdateSession us) {
             try {
               long t1 = System.currentTimeMillis();
 
-              logger.logManyTablets(sendables);
+              logger.logManyTablets(loggables);
 
               long t2 = System.currentTimeMillis();
               us.walogTimes.addStat(t2 - t1);
@@ -1121,12 +1127,8 @@ private void flush(UpdateSession us) {
         Span commit = Trace.start("commit");
         try {
           long t1 = System.currentTimeMillis();
-          for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
-            CommitSession commitSession = entry.getKey();
-            List<Mutation> mutations = entry.getValue().getMutations();
-
+          sendables.forEach((commitSession, mutations) -> {
             commitSession.commit(mutations);
-
             KeyExtent extent = commitSession.getExtent();
 
             if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
@@ -1137,7 +1139,7 @@ private void flush(UpdateSession us) {
               us.successfulCommits.increment(us.currentTablet,
                   us.queuedMutations.get(us.currentTablet).size());
             }
-          }
+          });
           long t2 = System.currentTimeMillis();
 
           us.flushTime += (t2 - pt1);
@@ -1278,12 +1280,14 @@ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
           throw new NotServingTabletException(tkeyExtent);
         }
 
-        while (true) {
+        Durability durability = DurabilityImpl
+            .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
+        // instead of always looping on true, skip completely when durability is NONE
+        while (durability != Durability.NONE) {
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl
-                  .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability));
+              logger.log(cs, cs.getWALogSeq(), mutation, durability);
             } finally {
               wal.stop();
             }
@@ -1358,7 +1362,8 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
         ArrayList<TCMResult> results, ConditionalSession sess) {
       Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
 
-      Map<CommitSession,Mutations> sendables = new HashMap<>();
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
 
       boolean sessionCanceled = sess.interruptFlag.get();
 
@@ -1371,7 +1376,8 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
             for (ServerConditionalMutation scm : entry.getValue())
               results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
           } else {
-            final Durability tabletDurability = tablet.getDurability();
+            final Durability durability = DurabilityImpl.resolveDurabilty(sess.durability,
+                tablet.getDurability());
             try {
 
               @SuppressWarnings("unchecked")
@@ -1388,18 +1394,21 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
                 } else {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                  sendables.put(cs,
-                      new Mutations(
-                          DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
-                          mutations));
+                  if (durability != Durability.NONE) {
+                    loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                        mutations, durability));
+                  }
+                  sendables.put(cs, mutations);
                 }
               }
             } catch (TConstraintViolationException e) {
+              CommitSession cs = e.getCommitSession();
               if (e.getNonViolators().size() > 0) {
-                sendables.put(e.getCommitSession(),
-                    new Mutations(
-                        DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
-                        e.getNonViolators()));
+                if (durability != Durability.NONE) {
+                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                      e.getNonViolators(), durability));
+                }
+                sendables.put(cs, e.getNonViolators());
                 for (Mutation m : e.getNonViolators())
                   results.add(
                       new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
@@ -1420,10 +1429,10 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
 
       Span walSpan = Trace.start("wal");
       try {
-        while (sendables.size() > 0) {
+        while (loggables.size() > 0) {
           try {
             long t1 = System.currentTimeMillis();
-            logger.logManyTablets(sendables);
+            logger.logManyTablets(loggables);
             long t2 = System.currentTimeMillis();
             updateWalogWriteTime(t2 - t1);
             break;
@@ -1442,12 +1451,7 @@ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutat
       Span commitSpan = Trace.start("commit");
       try {
         long t1 = System.currentTimeMillis();
-        for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
-          CommitSession commitSession = entry.getKey();
-          List<Mutation> mutations = entry.getValue().getMutations();
-
-          commitSession.commit(mutations);
-        }
+        sendables.forEach(CommitSession::commit);
         long t2 = System.currentTimeMillis();
         updateAvgCommitTime(t2 - t1, sendables.size());
       } finally {
@@ -3404,7 +3408,7 @@ public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf
             "Unable to find recovery files for extent " + extent + " logEntry: " + entry);
       recoveryLogs.add(recovery);
     }
-    logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
+    logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
   }
 
   public int createLogId() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cdb787feb6..a0bebabeb9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -31,6 +31,7 @@
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -597,7 +598,7 @@ private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
     return new LoggerOperation(work);
   }
 
-  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+  public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) throws IOException {
     Durability durability = Durability.NONE;
     List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<>();
     for (TabletMutations tabletMutations : mutations) {
@@ -608,21 +609,20 @@ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IO
       LogFileValue value = new LogFileValue();
       value.mutations = tabletMutations.getMutations();
       data.add(new Pair<>(key, value));
-      if (tabletMutations.getDurability().ordinal() > durability.ordinal()) {
-        durability = tabletMutations.getDurability();
-      }
+      durability = maxDurability(tabletMutations.getDurability(), durability);
     }
-    return logFileData(data, chooseDurabilityForGroupCommit(mutations));
+    return logFileData(data, durability);
   }
 
-  static Durability chooseDurabilityForGroupCommit(List<TabletMutations> mutations) {
-    Durability result = Durability.NONE;
-    for (TabletMutations tabletMutations : mutations) {
-      if (tabletMutations.getDurability().ordinal() > result.ordinal()) {
-        result = tabletMutations.getDurability();
-      }
+  /**
+   * Return the Durability with the highest precedence
+   */
+  static Durability maxDurability(Durability dur1, Durability dur2) {
+    if (dur1.ordinal() > dur2.ordinal()) {
+      return dur1;
+    } else {
+      return dur2;
     }
-    return result;
   }
 
   public LoggerOperation minorCompactionFinished(long seq, int tid, String fqfn,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 4f708802ce..1a1bfa2e98 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -16,15 +16,13 @@
  */
 package org.apache.accumulo.tserver.log;
 
+import static java.util.Collections.singletonList;
+
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,13 +42,11 @@
 import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.fate.util.Retry.RetryFactory;
-import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.tserver.Mutations;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
@@ -320,15 +316,6 @@ public void run() {
     }));
   }
 
-  public void resetLoggers() throws IOException {
-    logIdLock.writeLock().lock();
-    try {
-      close();
-    } finally {
-      logIdLock.writeLock().unlock();
-    }
-  }
-
   private synchronized void close() throws IOException {
     if (!logIdLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("close should be called with write lock held!");
@@ -356,22 +343,6 @@ private synchronized void close() throws IOException {
     LoggerOperation write(DfsLogger logger) throws Exception;
   }
 
-  private void write(CommitSession commitSession, boolean mincFinish, Writer writer)
-      throws IOException {
-    write(commitSession, mincFinish, writer, writeRetryFactory.createRetry());
-  }
-
-  private void write(CommitSession commitSession, boolean mincFinish, Writer writer,
-      Retry writeRetry) throws IOException {
-    List<CommitSession> sessions = Collections.singletonList(commitSession);
-    write(sessions, mincFinish, writer, writeRetry);
-  }
-
-  private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer)
-      throws IOException {
-    write(sessions, mincFinish, writer, writeRetryFactory.createRetry());
-  }
-
   private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer,
       Retry writeRetry) throws IOException {
     // Work very hard not to lock this during calls to the outside world
@@ -454,7 +425,6 @@ boolean test() {
           @Override
           void withWriteLock() throws IOException {
             close();
-            closeForReplication(sessions);
           }
         });
       }
@@ -471,70 +441,44 @@ boolean test() {
       @Override
       void withWriteLock() throws IOException {
         close();
-        closeForReplication(sessions);
       }
     });
   }
 
-  protected void closeForReplication(Collection<CommitSession> sessions) {
-    // TODO We can close the WAL here for replication purposes
-  }
-
   public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
       throws IOException {
     // scribble this into the metadata tablet, too.
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
-            commitSession.getExtent());
-        return DfsLogger.NO_WAIT_LOGGER_OP;
-      }
+    write(singletonList(commitSession), false, logger -> {
+      logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
+          commitSession.getExtent());
+      return DfsLogger.NO_WAIT_LOGGER_OP;
     }, writeRetry);
   }
 
+  /**
+   * Log a single mutation. This method expects mutations that have a durability other than NONE.
+   */
   public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m,
       final Durability durability) throws IOException {
-    if (durability == Durability.NONE) {
-      return;
-    }
-    if (durability == Durability.DEFAULT) {
+    if (durability == Durability.DEFAULT || durability == Durability.NONE) {
       throw new IllegalArgumentException("Unexpected durability " + durability);
     }
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
-      }
-    });
+    write(singletonList(commitSession), false,
+        logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+        writeRetryFactory.createRetry());
     logSizeEstimate.addAndGet(m.numBytes());
   }
 
-  public void logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException {
-
-    final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations);
-    for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
-      if (entry.getValue().getDurability() == Durability.NONE) {
-        loggables.remove(entry.getKey());
-      }
-    }
+  /**
+   * Log mutations. This method expects mutations that have a durability other than NONE.
+   */
+  public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws IOException {
     if (loggables.size() == 0)
       return;
 
-    write(loggables.keySet(), false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        List<TabletMutations> copy = new ArrayList<>(loggables.size());
-        for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
-          CommitSession cs = entry.getKey();
-          Durability durability = entry.getValue().getDurability();
-          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-              entry.getValue().getMutations(), durability));
-        }
-        return logger.logManyTablets(copy);
-      }
-    });
-    for (Mutations entry : loggables.values()) {
+    write(loggables.keySet(), false, logger -> logger.logManyTablets(loggables.values()),
+        writeRetryFactory.createRetry());
+    for (TabletMutations entry : loggables.values()) {
       if (entry.getMutations().size() < 1) {
         throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
       }
@@ -550,13 +494,10 @@ public void minorCompactionFinished(final CommitSession commitSession,
 
     long t1 = System.currentTimeMillis();
 
-    write(commitSession, true, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(),
-            fullyQualifiedFileName, durability);
-      }
-    });
+    write(
+        singletonList(commitSession), true, logger -> logger.minorCompactionFinished(walogSeq,
+            commitSession.getLogId(), fullyQualifiedFileName, durability),
+        writeRetryFactory.createRetry());
 
     long t2 = System.currentTimeMillis();
 
@@ -565,18 +506,15 @@ public LoggerOperation write(DfsLogger logger) throws Exception {
 
   public long minorCompactionStarted(final CommitSession commitSession, final long seq,
       final String fullyQualifiedFileName, final Durability durability) throws IOException {
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName,
-            durability);
-      }
-    });
+    write(
+        singletonList(commitSession), false, logger -> logger.minorCompactionStarted(seq,
+            commitSession.getLogId(), fullyQualifiedFileName, durability),
+        writeRetryFactory.createRetry());
     return seq;
   }
 
-  public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path> logs,
-      Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles,
+      MutationReceiver mr) throws IOException {
     try {
       SortedLogRecovery recovery = new SortedLogRecovery(fs);
       recovery.recover(extent, logs, tabletFiles, mr);
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 0291747108..80a6ff5650 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -19,6 +19,7 @@
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -31,28 +32,36 @@
   @Test
   public void testDurabilityForGroupCommit() {
     List<TabletMutations> lst = new ArrayList<>();
-    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
     TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
     lst.add(m1);
-    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
     TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
     lst.add(m2);
-    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
     TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
     lst.add(m3);
-    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
     TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
     lst.add(m4);
-    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
     TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
     lst.add(m5);
-    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
     TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
     lst.add(m6);
-    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
     TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
     lst.add(m7);
-    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
+  }
+
+  static Durability chooseDurabilityForGroupCommit(Collection<TabletMutations> mutations) {
+    Durability result = Durability.NONE;
+    for (TabletMutations tabletMutations : mutations) {
+      result = DfsLogger.maxDurability(tabletMutations.getDurability(), result);
+    }
+    return result;
   }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services