You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/12/12 22:57:54 UTC
[accumulo] branch master updated: Cleanup TabletServerLogger code
(#793)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new eff92f1 Cleanup TabletServerLogger code (#793)
eff92f1 is described below
commit eff92f178b01786c4debcc9f927c8cd9a84103a7
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Dec 12 17:57:50 2018 -0500
Cleanup TabletServerLogger code (#793)
* Remove unnecessary object manipulation across method calls
* Remove unused methods
* Replace code with lambdas
* Removed Mutations class that is no longer needed
* Removed extra loops for checking durability by creating maxDurability method
* Make Tserver not log mutation when durability is none
* Simplify sendables object that calls commit on mutations
---
.../org/apache/accumulo/tserver/Mutations.java | 40 -------
.../org/apache/accumulo/tserver/TabletServer.java | 82 +++++++-------
.../org/apache/accumulo/tserver/log/DfsLogger.java | 24 ++--
.../accumulo/tserver/log/TabletServerLogger.java | 122 +++++----------------
.../apache/accumulo/tserver/log/DfsLoggerTest.java | 25 +++--
5 files changed, 102 insertions(+), 191 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
deleted file mode 100644
index 76061e6..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.util.List;
-
-import org.apache.accumulo.core.client.Durability;
-import org.apache.accumulo.core.data.Mutation;
-
-public class Mutations {
- private final Durability durability;
- private final List<Mutation> mutations;
-
- Mutations(Durability durability, List<Mutation> mutations) {
- this.durability = durability;
- this.mutations = mutations;
- }
-
- public Durability getDurability() {
- return durability;
- }
-
- public List<Mutation> getMutations() {
- return mutations;
- }
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 51962bd..97cdc9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1013,7 +1013,8 @@ public class TabletServer implements Runnable {
private void flush(UpdateSession us) {
int mutationCount = 0;
- Map<CommitSession,Mutations> sendables = new HashMap<>();
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+ Map<CommitSession,TabletMutations> loggables = new HashMap<>();
Throwable error = null;
long pt1 = System.currentTimeMillis();
@@ -1031,7 +1032,8 @@ public class TabletServer implements Runnable {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
Tablet tablet = entry.getKey();
- Durability tabletDurability = tablet.getDurability();
+ Durability durability = DurabilityImpl.resolveDurabilty(us.durability,
+ tablet.getDurability());
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
@@ -1045,8 +1047,11 @@ public class TabletServer implements Runnable {
}
us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
} else {
- sendables.put(commitSession, new Mutations(
- DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
+ if (durability != Durability.NONE) {
+ loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
+ commitSession.getWALogSeq(), mutations, durability));
+ }
+ sendables.put(commitSession, mutations);
mutationCount += mutations.size();
}
@@ -1059,9 +1064,12 @@ public class TabletServer implements Runnable {
// only log and commit mutations if there were some
// that did not violate constraints... this is what
// prepareMutationsForCommit() expects
- sendables.put(e.getCommitSession(),
- new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability),
- e.getNonViolators()));
+ CommitSession cs = e.getCommitSession();
+ if (durability != Durability.NONE) {
+ loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+ e.getNonViolators(), durability));
+ }
+ sendables.put(cs, e.getNonViolators());
}
mutationCount += mutations.size();
@@ -1082,9 +1090,7 @@ public class TabletServer implements Runnable {
updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
if (error != null) {
- for (Entry<CommitSession,Mutations> e : sendables.entrySet()) {
- e.getKey().abortCommit(e.getValue().getMutations());
- }
+ sendables.forEach(CommitSession::abortCommit);
throw new RuntimeException(error);
}
try {
@@ -1094,7 +1100,7 @@ public class TabletServer implements Runnable {
try {
long t1 = System.currentTimeMillis();
- logger.logManyTablets(sendables);
+ logger.logManyTablets(loggables);
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
@@ -1115,12 +1121,8 @@ public class TabletServer implements Runnable {
Span commit = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
- for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
- CommitSession commitSession = entry.getKey();
- List<Mutation> mutations = entry.getValue().getMutations();
-
+ sendables.forEach((commitSession, mutations) -> {
commitSession.commit(mutations);
-
KeyExtent extent = commitSession.getExtent();
if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
@@ -1131,7 +1133,7 @@ public class TabletServer implements Runnable {
us.successfulCommits.increment(us.currentTablet,
us.queuedMutations.get(us.currentTablet).size());
}
- }
+ });
long t2 = System.currentTimeMillis();
us.flushTime += (t2 - pt1);
@@ -1266,12 +1268,14 @@ public class TabletServer implements Runnable {
throw new NotServingTabletException(tkeyExtent);
}
- while (true) {
+ Durability durability = DurabilityImpl
+ .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
+ // instead of always looping on true, skip completely when durability is NONE
+ while (durability != Durability.NONE) {
try {
final Span wal = Trace.start("wal");
try {
- logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl
- .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability));
+ logger.log(cs, cs.getWALogSeq(), mutation, durability);
} finally {
wal.stop();
}
@@ -1356,7 +1360,8 @@ public class TabletServer implements Runnable {
ArrayList<TCMResult> results, ConditionalSession sess) {
Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
- Map<CommitSession,Mutations> sendables = new HashMap<>();
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+ Map<CommitSession,TabletMutations> loggables = new HashMap<>();
boolean sessionCanceled = sess.interruptFlag.get();
@@ -1369,7 +1374,8 @@ public class TabletServer implements Runnable {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
- final Durability tabletDurability = tablet.getDurability();
+ final Durability durability = DurabilityImpl.resolveDurabilty(sess.durability,
+ tablet.getDurability());
try {
@SuppressWarnings("unchecked")
@@ -1386,18 +1392,21 @@ public class TabletServer implements Runnable {
} else {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
- sendables.put(cs,
- new Mutations(
- DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
- mutations));
+ if (durability != Durability.NONE) {
+ loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+ mutations, durability));
+ }
+ sendables.put(cs, mutations);
}
}
} catch (TConstraintViolationException e) {
+ CommitSession cs = e.getCommitSession();
if (e.getNonViolators().size() > 0) {
- sendables.put(e.getCommitSession(),
- new Mutations(
- DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
- e.getNonViolators()));
+ if (durability != Durability.NONE) {
+ loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+ e.getNonViolators(), durability));
+ }
+ sendables.put(cs, e.getNonViolators());
for (Mutation m : e.getNonViolators())
results.add(
new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
@@ -1418,10 +1427,10 @@ public class TabletServer implements Runnable {
Span walSpan = Trace.start("wal");
try {
- while (sendables.size() > 0) {
+ while (loggables.size() > 0) {
try {
long t1 = System.currentTimeMillis();
- logger.logManyTablets(sendables);
+ logger.logManyTablets(loggables);
long t2 = System.currentTimeMillis();
updateWalogWriteTime(t2 - t1);
break;
@@ -1440,12 +1449,7 @@ public class TabletServer implements Runnable {
Span commitSpan = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
- for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
- CommitSession commitSession = entry.getKey();
- List<Mutation> mutations = entry.getValue().getMutations();
-
- commitSession.commit(mutations);
- }
+ sendables.forEach(CommitSession::commit);
long t2 = System.currentTimeMillis();
updateAvgCommitTime(t2 - t1, sendables.size());
} finally {
@@ -3389,7 +3393,7 @@ public class TabletServer implements Runnable {
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
recoveryLogs.add(recovery);
}
- logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
+ logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
}
public int createLogId() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cdb787f..a0bebab 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Method;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -597,7 +598,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
return new LoggerOperation(work);
}
- public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+ public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) throws IOException {
Durability durability = Durability.NONE;
List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<>();
for (TabletMutations tabletMutations : mutations) {
@@ -608,21 +609,20 @@ public class DfsLogger implements Comparable<DfsLogger> {
LogFileValue value = new LogFileValue();
value.mutations = tabletMutations.getMutations();
data.add(new Pair<>(key, value));
- if (tabletMutations.getDurability().ordinal() > durability.ordinal()) {
- durability = tabletMutations.getDurability();
- }
+ durability = maxDurability(tabletMutations.getDurability(), durability);
}
- return logFileData(data, chooseDurabilityForGroupCommit(mutations));
+ return logFileData(data, durability);
}
- static Durability chooseDurabilityForGroupCommit(List<TabletMutations> mutations) {
- Durability result = Durability.NONE;
- for (TabletMutations tabletMutations : mutations) {
- if (tabletMutations.getDurability().ordinal() > result.ordinal()) {
- result = tabletMutations.getDurability();
- }
+ /**
+ * Return the Durability with the highest precedence
+ */
+ static Durability maxDurability(Durability dur1, Durability dur2) {
+ if (dur1.ordinal() > dur2.ordinal()) {
+ return dur1;
+ } else {
+ return dur2;
}
- return result;
}
public LoggerOperation minorCompactionFinished(long seq, int tid, String fqfn,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 4f70880..1a1bfa2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -16,15 +16,13 @@
*/
package org.apache.accumulo.tserver.log;
+import static java.util.Collections.singletonList;
+
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -44,13 +42,11 @@ import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.fate.util.Retry.RetryFactory;
-import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.tserver.Mutations;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
@@ -320,15 +316,6 @@ public class TabletServerLogger {
}));
}
- public void resetLoggers() throws IOException {
- logIdLock.writeLock().lock();
- try {
- close();
- } finally {
- logIdLock.writeLock().unlock();
- }
- }
-
private synchronized void close() throws IOException {
if (!logIdLock.isWriteLockedByCurrentThread()) {
throw new IllegalStateException("close should be called with write lock held!");
@@ -356,22 +343,6 @@ public class TabletServerLogger {
LoggerOperation write(DfsLogger logger) throws Exception;
}
- private void write(CommitSession commitSession, boolean mincFinish, Writer writer)
- throws IOException {
- write(commitSession, mincFinish, writer, writeRetryFactory.createRetry());
- }
-
- private void write(CommitSession commitSession, boolean mincFinish, Writer writer,
- Retry writeRetry) throws IOException {
- List<CommitSession> sessions = Collections.singletonList(commitSession);
- write(sessions, mincFinish, writer, writeRetry);
- }
-
- private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer)
- throws IOException {
- write(sessions, mincFinish, writer, writeRetryFactory.createRetry());
- }
-
private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer,
Retry writeRetry) throws IOException {
// Work very hard not to lock this during calls to the outside world
@@ -454,7 +425,6 @@ public class TabletServerLogger {
@Override
void withWriteLock() throws IOException {
close();
- closeForReplication(sessions);
}
});
}
@@ -471,70 +441,44 @@ public class TabletServerLogger {
@Override
void withWriteLock() throws IOException {
close();
- closeForReplication(sessions);
}
});
}
- protected void closeForReplication(Collection<CommitSession> sessions) {
- // TODO We can close the WAL here for replication purposes
- }
-
public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
throws IOException {
// scribble this into the metadata tablet, too.
- write(commitSession, false, new Writer() {
- @Override
- public LoggerOperation write(DfsLogger logger) throws Exception {
- logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
- commitSession.getExtent());
- return DfsLogger.NO_WAIT_LOGGER_OP;
- }
+ write(singletonList(commitSession), false, logger -> {
+ logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
+ commitSession.getExtent());
+ return DfsLogger.NO_WAIT_LOGGER_OP;
}, writeRetry);
}
+ /**
+ * Log a single mutation. This method expects mutations that have a durability other than NONE.
+ */
public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m,
final Durability durability) throws IOException {
- if (durability == Durability.NONE) {
- return;
- }
- if (durability == Durability.DEFAULT) {
+ if (durability == Durability.DEFAULT || durability == Durability.NONE) {
throw new IllegalArgumentException("Unexpected durability " + durability);
}
- write(commitSession, false, new Writer() {
- @Override
- public LoggerOperation write(DfsLogger logger) throws Exception {
- return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
- }
- });
+ write(singletonList(commitSession), false,
+ logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+ writeRetryFactory.createRetry());
logSizeEstimate.addAndGet(m.numBytes());
}
- public void logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException {
-
- final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations);
- for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
- if (entry.getValue().getDurability() == Durability.NONE) {
- loggables.remove(entry.getKey());
- }
- }
+ /**
+ * Log mutations. This method expects mutations that have a durability other than NONE.
+ */
+ public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws IOException {
if (loggables.size() == 0)
return;
- write(loggables.keySet(), false, new Writer() {
- @Override
- public LoggerOperation write(DfsLogger logger) throws Exception {
- List<TabletMutations> copy = new ArrayList<>(loggables.size());
- for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
- CommitSession cs = entry.getKey();
- Durability durability = entry.getValue().getDurability();
- copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
- entry.getValue().getMutations(), durability));
- }
- return logger.logManyTablets(copy);
- }
- });
- for (Mutations entry : loggables.values()) {
+ write(loggables.keySet(), false, logger -> logger.logManyTablets(loggables.values()),
+ writeRetryFactory.createRetry());
+ for (TabletMutations entry : loggables.values()) {
if (entry.getMutations().size() < 1) {
throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
}
@@ -550,13 +494,10 @@ public class TabletServerLogger {
long t1 = System.currentTimeMillis();
- write(commitSession, true, new Writer() {
- @Override
- public LoggerOperation write(DfsLogger logger) throws Exception {
- return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(),
- fullyQualifiedFileName, durability);
- }
- });
+ write(
+ singletonList(commitSession), true, logger -> logger.minorCompactionFinished(walogSeq,
+ commitSession.getLogId(), fullyQualifiedFileName, durability),
+ writeRetryFactory.createRetry());
long t2 = System.currentTimeMillis();
@@ -565,18 +506,15 @@ public class TabletServerLogger {
public long minorCompactionStarted(final CommitSession commitSession, final long seq,
final String fullyQualifiedFileName, final Durability durability) throws IOException {
- write(commitSession, false, new Writer() {
- @Override
- public LoggerOperation write(DfsLogger logger) throws Exception {
- return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName,
- durability);
- }
- });
+ write(
+ singletonList(commitSession), false, logger -> logger.minorCompactionStarted(seq,
+ commitSession.getLogId(), fullyQualifiedFileName, durability),
+ writeRetryFactory.createRetry());
return seq;
}
- public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path> logs,
- Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+ public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles,
+ MutationReceiver mr) throws IOException {
try {
SortedLogRecovery recovery = new SortedLogRecovery(fs);
recovery.recover(extent, logs, tabletFiles, mr);
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 0291747..80a6ff5 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -31,28 +32,36 @@ public class DfsLoggerTest {
@Test
public void testDurabilityForGroupCommit() {
List<TabletMutations> lst = new ArrayList<>();
- assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
lst.add(m1);
- assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
lst.add(m2);
- assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
lst.add(m3);
- assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
lst.add(m4);
- assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
lst.add(m5);
- assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
lst.add(m6);
- assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
lst.add(m7);
- assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+ assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
+ }
+
+ static Durability chooseDurabilityForGroupCommit(Collection<TabletMutations> mutations) {
+ Durability result = Durability.NONE;
+ for (TabletMutations tabletMutations : mutations) {
+ result = DfsLogger.maxDurability(tabletMutations.getDurability(), result);
+ }
+ return result;
}
}