You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 14:50:49 UTC
aurora git commit: Remove redundant transaction recorder
Repository: aurora
Updated Branches:
refs/heads/master cea43db9d -> 5e008ff7f
Remove redundant transaction recorder
Reviewed at https://reviews.apache.org/r/64283/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5e008ff7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5e008ff7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5e008ff7
Branch: refs/heads/master
Commit: 5e008ff7fe7f5701e1baf2051c2873f655ca7aed
Parents: cea43db
Author: Bill Farner <wf...@apache.org>
Authored: Sun Dec 3 06:50:45 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sun Dec 3 06:50:45 2017 -0800
----------------------------------------------------------------------
.../scheduler/storage/log/LogPersistence.java | 5 +-
.../scheduler/storage/log/StreamManager.java | 9 +-
.../storage/log/StreamManagerImpl.java | 133 ++---------------
.../storage/log/StreamTransaction.java | 40 -----
.../scheduler/storage/log/LogManagerTest.java | 149 +------------------
5 files changed, 21 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
index a0a6b6c..e70e605 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -18,6 +18,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -99,10 +100,8 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
@Override
public void persist(Stream<Op> mutations) throws PersistenceException {
- StreamTransaction transaction = streamManager.startTransaction();
- mutations.forEach(transaction::add);
try {
- transaction.commit();
+ streamManager.commit(mutations.collect(Collectors.toList()));
} catch (CodingException e) {
throw new PersistenceException(e);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index 18da32d..73602cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -14,8 +14,10 @@
package org.apache.aurora.scheduler.storage.log;
import java.util.Iterator;
+import java.util.List;
import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.log.Log;
@@ -52,12 +54,11 @@ public interface StreamManager {
void truncateBefore(Log.Position position);
/**
- * Starts a transaction that can be used to commit a series of ops to the log stream atomically.
+ * Saves operations to the log stream.
*
- * @return StreamTransaction A transaction manager to handle batching up commits to the
- * underlying stream.
+ * @param mutations Operations to save.
*/
- StreamTransaction startTransaction();
+ void commit(List<Op> mutations);
/**
* Adds a snapshot to the log and if successful, truncates the log entries preceding the
http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index c5b107f..9eb309a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -15,33 +15,24 @@ package org.apache.aurora.scheduler.storage.log;
import java.util.Arrays;
import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.inject.Inject;
-import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.primitives.Bytes;
import com.google.inject.assistedinject.Assisted;
import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.storage.Frame;
import org.apache.aurora.gen.storage.FrameHeader;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
@@ -193,8 +184,16 @@ class StreamManagerImpl implements StreamManager {
}
@Override
- public StreamTransactionImpl startTransaction() {
- return new StreamTransactionImpl();
+ public void commit(List<Op> mutations) {
+ if (mutations.isEmpty()) {
+ return;
+ }
+
+ Transaction transaction = new Transaction()
+ .setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION)
+ .setOps(mutations);
+ appendAndGetPosition(LogEntry.transaction(transaction));
+ vars.unSnapshottedTransactions.incrementAndGet();
}
@Override
@@ -235,114 +234,4 @@ class StreamManagerImpl implements StreamManager {
vars.entriesWritten.incrementAndGet();
return firstPosition;
}
-
- final class StreamTransactionImpl implements StreamTransaction {
- private final Transaction transaction =
- new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
- private final AtomicBoolean committed = new AtomicBoolean(false);
-
- StreamTransactionImpl() {
- // supplied by factory method
- }
-
- @Override
- public Log.Position commit() throws CodingException {
- Preconditions.checkState(!committed.getAndSet(true),
- "Can only call commit once per transaction.");
-
- if (!transaction.isSetOps()) {
- return null;
- }
-
- Log.Position position = appendAndGetPosition(LogEntry.transaction(transaction));
- vars.unSnapshottedTransactions.incrementAndGet();
- return position;
- }
-
- @Override
- public void add(Op op) {
- Preconditions.checkState(!committed.get());
-
- Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
- if (prior == null || !coalesce(prior, op)) {
- transaction.addToOps(op);
- }
- }
-
- /**
- * Tries to coalesce a new op into the prior to compact the binary representation and increase
- * batching.
- *
- * @param prior The previous op.
- * @param next The next op to be added.
- * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
- */
- private boolean coalesce(Op prior, Op next) {
- if (!prior.isSet() && !next.isSet()) {
- return false;
- }
-
- Op._Fields priorType = prior.getSetField();
- if (!priorType.equals(next.getSetField())) {
- return false;
- }
-
- switch (priorType) {
- case SAVE_FRAMEWORK_ID:
- prior.setSaveFrameworkId(next.getSaveFrameworkId());
- return true;
- case SAVE_TASKS:
- coalesce(prior.getSaveTasks(), next.getSaveTasks());
- return true;
- case REMOVE_TASKS:
- coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
- return true;
- case SAVE_HOST_ATTRIBUTES:
- return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
- default:
- return false;
- }
- }
-
- private void coalesce(SaveTasks prior, SaveTasks next) {
- if (next.isSetTasks()) {
- if (prior.isSetTasks()) {
- // It is an expected invariant that an operation may reference a task (identified by
- // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
- // the most recent task definition overrides the prior operation.
- Map<String, ScheduledTask> coalesced = Maps.newHashMap();
- for (ScheduledTask task : prior.getTasks()) {
- coalesced.put(task.getAssignedTask().getTaskId(), task);
- }
- for (ScheduledTask task : next.getTasks()) {
- coalesced.put(task.getAssignedTask().getTaskId(), task);
- }
- prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
- } else {
- prior.setTasks(next.getTasks());
- }
- }
- }
-
- private void coalesce(RemoveTasks prior, RemoveTasks next) {
- if (next.isSetTaskIds()) {
- if (prior.isSetTaskIds()) {
- prior.setTaskIds(ImmutableSet.<String>builder()
- .addAll(prior.getTaskIds())
- .addAll(next.getTaskIds())
- .build());
- } else {
- prior.setTaskIds(next.getTaskIds());
- }
- }
- }
-
- private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
- if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
- prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
- return true;
- }
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
deleted file mode 100644
index a51fd18..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.scheduler.log.Log;
-
-/**
- * Manages a single log stream append transaction. Local storage ops can be added to the
- * transaction and then later committed as an atomic unit.
- */
-interface StreamTransaction {
- /**
- * Appends any ops that have been added to this transaction to the log stream in a single
- * atomic record.
- *
- * @return The position of the log entry committed in this transaction, if any.
- * @throws CodingException If there was a problem encoding a log entry for commit.
- */
- Log.Position commit() throws ThriftBinaryCodec.CodingException;
-
- /**
- * Adds a local storage operation to this transaction.
- *
- * @param op The local storage op to add.
- */
- void add(Op op);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index cb38f10..4d210b2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -16,13 +16,8 @@ package org.apache.aurora.scheduler.storage.log;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -61,25 +56,13 @@ import org.junit.Test;
import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
public class LogManagerTest extends EasyMockTest {
private static final Amount<Integer, Data> NO_FRAMES_EVER_SIZE =
Amount.of(Integer.MAX_VALUE, Data.GB);
- private static final Function<LogEntry, byte[]> ENCODER = entry -> {
- try {
- return encode(entry);
- } catch (CodingException e) {
- throw new RuntimeException(e);
- }
- };
-
private Stream stream;
private Position position1;
private Position position2;
@@ -139,41 +122,10 @@ public class LogManagerTest extends EasyMockTest {
}
@Test
- public void testStreamManagerSuccessiveCommits() throws CodingException {
- control.replay();
-
- StreamManager streamManager = createNoMessagesStreamManager();
- StreamTransaction streamTransaction = streamManager.startTransaction();
- streamTransaction.commit();
-
- assertNotSame("Expected a new transaction to be started after a commit",
- streamTransaction, streamManager.startTransaction());
- }
-
- @Test
public void testTransactionEmpty() throws CodingException {
control.replay();
- Position position = createNoMessagesStreamManager().startTransaction().commit();
- assertNull(position);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testTransactionDoubleCommit() throws CodingException {
- control.replay();
-
- StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
- streamTransaction.commit();
- streamTransaction.commit();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testTransactionAddAfterCommit() throws CodingException {
- control.replay();
-
- StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
- streamTransaction.commit();
- streamTransaction.add(Op.saveFrameworkId(new SaveFrameworkId("don't allow this")));
+ createNoMessagesStreamManager().commit(ImmutableList.of());
}
private static class LogEntryMatcher implements IArgumentMatcher {
@@ -228,12 +180,7 @@ public class LogManagerTest extends EasyMockTest {
StreamManager streamManager = createNoMessagesStreamManager();
control.replay();
- StreamTransaction transaction = streamManager.startTransaction();
- transaction.add(saveFrameworkId);
- transaction.add(deleteJob);
-
- Position position = transaction.commit();
- assertSame(position1, position);
+ streamManager.commit(ImmutableList.of(saveFrameworkId, deleteJob));
}
static class Message {
@@ -281,97 +228,7 @@ public class LogManagerTest extends EasyMockTest {
StreamManager streamManager = createStreamManager(message.chunkSize);
control.replay();
- StreamTransaction transaction = streamManager.startTransaction();
- transaction.add(saveFrameworkId);
-
- Position position = transaction.commit();
- assertSame(position1, position);
- }
-
- @Test
- public void testConcurrentWrites() throws Exception {
- control.replay(); // No easymock expectations used here
-
- Op op1 = Op.removeJob(new RemoveJob(JobKeys.from("r1", "env", "name").newBuilder()));
- final Op op2 = Op.removeJob(new RemoveJob(JobKeys.from("r2", "env", "name").newBuilder()));
-
- LogEntry transaction1 = createLogEntry(op1);
- LogEntry transaction2 = createLogEntry(op2);
-
- final CountDownLatch message1Started = new CountDownLatch(1);
-
- Message message1 = frame(transaction1);
- Message message2 = frame(transaction2);
-
- List<byte[]> expectedAppends =
- ImmutableList.<byte[]>builder()
- .add(encode(message1.header))
- .addAll(Iterables.transform(message1.chunks, ENCODER))
- .add(encode(message2.header))
- .addAll(Iterables.transform(message2.chunks, ENCODER))
- .build();
-
- final Deque<byte[]> actualAppends = new LinkedBlockingDeque<>();
-
- Stream mockStream = new Stream() {
- @Override
- public Position append(byte[] contents) throws StreamAccessException {
- actualAppends.addLast(contents);
- message1Started.countDown();
- try {
- // If a chunked message is not properly serialized to the log, this sleep all but ensures
- // interleaved chunk writes and a test failure.
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
-
- @Override
- public Iterator<Entry> readAll() throws InvalidPositionException, StreamAccessException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void truncateBefore(Position position)
- throws InvalidPositionException, StreamAccessException {
- throw new UnsupportedOperationException();
- }
- };
-
- final StreamManagerImpl streamManager = new StreamManagerImpl(
- mockStream,
- new EntrySerializer.EntrySerializerImpl(message1.chunkSize, Hashing.md5()),
- Hashing.md5(),
- new SnapshotDeduplicatorImpl());
- StreamTransaction tr1 = streamManager.startTransaction();
- tr1.add(op1);
-
- Thread snapshotThread = new Thread() {
- @Override
- public void run() {
- StreamTransaction tr2 = streamManager.startTransaction();
- tr2.add(op2);
- try {
- message1Started.await();
- tr2.commit();
- } catch (CodingException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- };
- snapshotThread.setDaemon(true);
- snapshotThread.start();
-
- tr1.commit();
-
- snapshotThread.join();
-
- assertEquals(expectedAppends.size(), actualAppends.size());
- for (byte[] expectedData : expectedAppends) {
- assertArrayEquals(expectedData, actualAppends.removeFirst());
- }
+ streamManager.commit(ImmutableList.of(saveFrameworkId));
}
@Test