You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:34 UTC
[41/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
deleted file mode 100644
index 74e8c07..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.LogEntry._Fields;
-import com.twitter.common.stats.Stats;
-
-/**
- * Utility class for working with log entries.
- */
-final class Entries {
-
- private static final Logger LOG = Logger.getLogger(Entries.class.getName());
-
- private static final AtomicLong COMPRESSION_BYTES_SAVED =
- Stats.exportLong("log_compressed_entry_bytes_saved");
-
- private Entries() {
- // Utility class.
- }
-
- /**
- * Deflates a log entry and wraps it in a deflated entry.
- * <p>
- * This will encode the entry using the thrift binary codec, and will apply deflate compression to
- * the resulting encoded data.
- * <p>
- * This operation is symmetric with {@link #inflate(LogEntry)}.
- *
- * @param entry Entry to deflate.
- * @return An entry with the {@code deflatedEntry} field set with the deflated serialized value
- * of the original entry.
- * @throws CodingException If the value could not be encoded or deflated.
- */
- static LogEntry deflate(LogEntry entry) throws CodingException {
- byte[] data = thriftBinaryEncode(entry);
- int initialLength = data.length;
- LOG.info("Deflating log entry of size " + initialLength);
- ByteArrayOutputStream deflated = new ByteArrayOutputStream();
- DeflaterOutputStream deflater = new DeflaterOutputStream(deflated);
- try {
- deflater.write(data);
- deflater.flush();
- deflater.close();
- byte[] deflatedData = deflated.toByteArray();
- int bytesSaved = initialLength - deflatedData.length;
- if (bytesSaved < 0) {
- LOG.warning("Deflated entry is larger than original by " + (bytesSaved * -1) + " bytes");
- } else {
- LOG.info("Deflated log entry size: " + deflatedData.length + " (saved " + bytesSaved + ")");
- }
-
- COMPRESSION_BYTES_SAVED.addAndGet(bytesSaved);
- return LogEntry.deflatedEntry(ByteBuffer.wrap(deflatedData));
- } catch (IOException e) {
- throw new CodingException("Failed to deflate snapshot: " + e, e);
- }
- }
-
- /**
- * Inflates and deserializes a deflated log entry.
- * <p>
- * This requires that the {@code deflatedEntry} field is set on the provided {@code entry}.
- * The encoded value will be inflated and deserialized as a {@link LogEntry}.
- *
- * @param entry Entry to inflate, which must be a deflated entry.
- * @return The inflated entry.
- * @throws CodingException If the value could not be inflated or decoded.
- */
- static LogEntry inflate(LogEntry entry) throws CodingException {
- Preconditions.checkArgument(entry.isSet(_Fields.DEFLATED_ENTRY));
-
- ByteArrayOutputStream inflated = new ByteArrayOutputStream();
- ByteBuffer data = entry.bufferForDeflatedEntry();
- LOG.info("Inflating deflated log entry of size " + data.remaining());
- InflaterInputStream inflater = new InflaterInputStream(
- new ByteArrayInputStream(data.array(), data.position(), data.remaining()));
- try {
- ByteStreams.copy(inflater, inflated);
- byte[] inflatedData = inflated.toByteArray();
- LOG.info("Inflated log entry size: " + inflatedData.length);
- return thriftBinaryDecode(inflatedData);
- } catch (IOException e) {
- throw new CodingException("Failed to inflate compressed log entry.", e);
- }
- }
-
- /**
- * Thrift binary-encodes a log entry.
- *
- * @param entry The entry to encode.
- * @return The serialized entry value.
- * @throws CodingException If the entry could not be encoded.
- */
- static byte[] thriftBinaryEncode(LogEntry entry) throws CodingException {
- return ThriftBinaryCodec.encodeNonNull(entry);
- }
-
- /**
- * Decodes a byte array containing thrift binary-encoded data.
- *
- * @param contents The data to decode.
- * @return The deserialized entry.
- * @throws CodingException If the entry could not be deserialized.
- */
- static LogEntry thriftBinaryDecode(byte[] contents) throws CodingException {
- return ThriftBinaryCodec.decodeNonNull(LogEntry.class, contents);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
deleted file mode 100644
index da29401..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Bytes;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.storage.Frame;
-import com.twitter.aurora.gen.storage.FrameChunk;
-import com.twitter.aurora.gen.storage.FrameHeader;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.LogEntry._Fields;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.RemoveTasks;
-import com.twitter.aurora.gen.storage.SaveHostAttributes;
-import com.twitter.aurora.gen.storage.SaveTasks;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.Transaction;
-import com.twitter.aurora.gen.storage.storageConstants;
-import com.twitter.aurora.scheduler.log.Log;
-import com.twitter.aurora.scheduler.log.Log.Entry;
-import com.twitter.aurora.scheduler.log.Log.Position;
-import com.twitter.aurora.scheduler.log.Log.Stream;
-import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Manages opening, reading from and writing to a {@link Log}.
- */
-public final class LogManager {
-
- /**
- * Identifies the maximum log entry size to permit before chunking entries into frames.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface MaxEntrySize { }
-
- /**
- * Binding annotation for settings regarding the way snapshots are written.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface SnapshotSetting { }
-
- private static final Logger LOG = Logger.getLogger(LogManager.class.getName());
-
- private final Log log;
- private final Amount<Integer, Data> maxEntrySize;
- private final boolean deflateSnapshots;
- private final ShutdownRegistry shutdownRegistry;
-
- @Inject
- LogManager(
- Log log,
- @MaxEntrySize Amount<Integer, Data> maxEntrySize,
- @SnapshotSetting boolean deflateSnapshots,
- ShutdownRegistry shutdownRegistry) {
-
- this.log = checkNotNull(log);
- this.maxEntrySize = checkNotNull(maxEntrySize);
- this.deflateSnapshots = deflateSnapshots;
- this.shutdownRegistry = checkNotNull(shutdownRegistry);
- }
-
- /**
- * Opens the log for reading and writing.
- *
- * @return A stream manager that can be used to manipulate the log stream.
- * @throws IOException If there is a problem opening the log.
- */
- public StreamManager open() throws IOException {
- final Stream stream = log.open();
- shutdownRegistry.addAction(new ExceptionalCommand<IOException>() {
- @Override public void execute() throws IOException {
- stream.close();
- }
- });
- return new StreamManager(stream, deflateSnapshots, maxEntrySize);
- }
-
- /**
- * Manages interaction with the log stream. Log entries can be
- * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
- * a {@link #startTransaction() transaction} consisting of one or more local storage
- * operations can be committed atomically, or the log can be compacted by
- * {@link #snapshot(com.twitter.aurora.gen.storage.Snapshot) snapshotting}.
- */
- public static class StreamManager {
-
- private static MessageDigest createDigest() {
- try {
- return MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException("Could not find provider for standard algorithm 'MD5'", e);
- }
- }
-
- private static class Vars {
- private final AtomicInteger unSnapshottedTransactions =
- Stats.exportInt("scheduler_log_un_snapshotted_transactions");
- private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
- private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
- private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
- private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
- private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
- private final AtomicLong deflatedEntriesRead =
- Stats.exportLong("scheduler_log_deflated_entries_read");
- private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
- }
- private final Vars vars = new Vars();
-
- private final Object writeMutex = new Object();
- private final Stream stream;
- private final boolean deflateSnapshots;
- private final MessageDigest digest;
- private final EntrySerializer entrySerializer;
-
- StreamManager(Stream stream, boolean deflateSnapshots, Amount<Integer, Data> maxEntrySize) {
- this.stream = checkNotNull(stream);
- this.deflateSnapshots = deflateSnapshots;
- digest = createDigest();
- entrySerializer = new EntrySerializer(digest, maxEntrySize);
- }
-
- /**
- * Reads all entries in the log stream after the given position. If the position
- * supplied is {@code null} then all log entries in the stream will be read.
- *
- * @param reader A reader that will be handed log entries decoded from the stream.
- * @throws CodingException if there was a problem decoding a log entry from the stream.
- * @throws InvalidPositionException if the given position is not found in the log.
- * @throws StreamAccessException if there is a problem reading from the log.
- */
- public void readFromBeginning(Closure<LogEntry> reader)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- Iterator<Entry> entries = stream.readAll();
-
- while (entries.hasNext()) {
- LogEntry logEntry = decodeLogEntry(entries.next());
- while (logEntry != null && isFrame(logEntry)) {
- logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
- }
- if (logEntry != null) {
- if (logEntry.isSet(_Fields.DEFLATED_ENTRY)) {
- logEntry = Entries.inflate(logEntry);
- vars.deflatedEntriesRead.incrementAndGet();
- }
-
- reader.execute(logEntry);
- vars.entriesRead.incrementAndGet();
- }
- }
- }
-
- @Nullable
- private LogEntry tryDecodeFrame(Frame frame, Iterator<Entry> entries) throws CodingException {
- if (!isHeader(frame)) {
- LOG.warning("Found a frame with no preceding header, skipping.");
- return null;
- }
- FrameHeader header = frame.getHeader();
- byte[][] chunks = new byte[header.chunkCount][];
-
- digest.reset();
- for (int i = 0; i < header.chunkCount; i++) {
- if (!entries.hasNext()) {
- logBadFrame(header, i);
- return null;
- }
- LogEntry logEntry = decodeLogEntry(entries.next());
- if (!isFrame(logEntry)) {
- logBadFrame(header, i);
- return logEntry;
- }
- Frame chunkFrame = logEntry.getFrame();
- if (!isChunk(chunkFrame)) {
- logBadFrame(header, i);
- return logEntry;
- }
- byte[] chunkData = chunkFrame.getChunk().getData();
- digest.update(chunkData);
- chunks[i] = chunkData;
- }
- if (!Arrays.equals(header.getChecksum(), digest.digest())) {
- throw new CodingException("Read back a framed log entry that failed its checksum");
- }
- return Entries.thriftBinaryDecode(Bytes.concat(chunks));
- }
-
- private static boolean isFrame(LogEntry logEntry) {
- return logEntry.getSetField() == LogEntry._Fields.FRAME;
- }
-
- private static boolean isChunk(Frame frame) {
- return frame.getSetField() == Frame._Fields.CHUNK;
- }
-
- private static boolean isHeader(Frame frame) {
- return frame.getSetField() == Frame._Fields.HEADER;
- }
-
- private void logBadFrame(FrameHeader header, int chunkIndex) {
- LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
- header.chunkCount, chunkIndex));
- vars.badFramesRead.incrementAndGet();
- }
-
- private LogEntry decodeLogEntry(Entry entry) throws CodingException {
- byte[] contents = entry.contents();
- vars.bytesRead.addAndGet(contents.length);
- return Entries.thriftBinaryDecode(contents);
- }
-
- /**
- * Truncates all entries in the log stream occuring before the given position. The entry at the
- * given position becomes the first entry in the stream when this call completes.
- *
- * @param position The last position to keep in the stream.
- * @throws InvalidPositionException if the specified position does not exist in this log.
- * @throws StreamAccessException if the stream could not be truncated.
- */
- void truncateBefore(Position position) {
- stream.truncateBefore(position);
- }
-
- /**
- * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
- * atomically.
- *
- * @return StreamTransaction A transaction manager to handle batching up commits to the
- * underlying stream.
- */
- StreamTransaction startTransaction() {
- return new StreamTransaction();
- }
-
- /**
- * Adds a snapshot to the log and if successful, truncates the log entries preceding the
- * snapshot.
- *
- * @param snapshot The snapshot to add.
- * @throws CodingException if the was a problem encoding the snapshot into a log entry.
- * @throws InvalidPositionException if there was a problem truncating before the snapshot.
- * @throws StreamAccessException if there was a problem appending the snapshot to the log.
- */
- @Timed("log_manager_snapshot")
- void snapshot(Snapshot snapshot)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- LogEntry entry = LogEntry.snapshot(snapshot);
- if (deflateSnapshots) {
- entry = Entries.deflate(entry);
- }
-
- Position position = appendAndGetPosition(entry);
- vars.snapshots.incrementAndGet();
- vars.unSnapshottedTransactions.set(0);
- stream.truncateBefore(position);
- }
-
- @Timed("log_manager_append")
- private Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
- Position firstPosition = null;
- byte[][] entries = entrySerializer.serialize(logEntry);
- synchronized (writeMutex) { // ensure all sub-entries are written as a unit
- for (byte[] entry : entries) {
- Position position = stream.append(entry);
- if (firstPosition == null) {
- firstPosition = position;
- }
- vars.bytesWritten.addAndGet(entry.length);
- }
- }
- vars.entriesWritten.incrementAndGet();
- return firstPosition;
- }
-
- @VisibleForTesting
- public static class EntrySerializer {
- private final MessageDigest digest;
- private final int maxEntrySizeBytes;
-
- private EntrySerializer(MessageDigest digest, Amount<Integer, Data> maxEntrySize) {
- this.digest = checkNotNull(digest);
- maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
- }
-
- public EntrySerializer(Amount<Integer, Data> maxEntrySize) {
- this(createDigest(), maxEntrySize);
- }
-
- /**
- * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
- *
- * @param logEntry The log entry to serialize.
- * @return Serialized and chunked log entry.
- * @throws CodingException If the entry could not be serialized.
- */
- @VisibleForTesting
- public byte[][] serialize(LogEntry logEntry) throws CodingException {
- byte[] entry = Entries.thriftBinaryEncode(logEntry);
- if (entry.length <= maxEntrySizeBytes) {
- return new byte[][] {entry};
- }
-
- int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
- byte[][] frames = new byte[chunks + 1][];
-
- frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
- for (int i = 0; i < chunks; i++) {
- int offset = i * maxEntrySizeBytes;
- ByteBuffer chunk =
- ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
- frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
- }
- return frames;
- }
-
- private byte[] checksum(byte[] data) {
- digest.reset();
- return digest.digest(data);
- }
-
- private static byte[] encode(Frame frame) throws CodingException {
- return Entries.thriftBinaryEncode(LogEntry.frame(frame));
- }
- }
-
- /**
- * Manages a single log stream append transaction. Local storage ops can be added to the
- * transaction and then later committed as an atomic unit.
- */
- final class StreamTransaction {
- private final Transaction transaction =
- new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
- private final AtomicBoolean committed = new AtomicBoolean(false);
-
- private StreamTransaction() {
- // supplied by factory method
- }
-
- /**
- * Appends any ops that have been added to this transaction to the log stream in a single
- * atomic record.
- *
- * @return The position of the log entry committed in this transaction, if any.
- * @throws CodingException If there was a problem encoding a log entry for commit.
- */
- Position commit() throws CodingException {
- Preconditions.checkState(!committed.getAndSet(true),
- "Can only call commit once per transaction.");
-
- if (!transaction.isSetOps()) {
- return null;
- }
-
- Position position = appendAndGetPosition(LogEntry.transaction(transaction));
- vars.unSnapshottedTransactions.incrementAndGet();
- return position;
- }
-
- /**
- * Adds a local storage operation to this transaction.
- *
- * @param op The local storage op to add.
- */
- void add(Op op) {
- Preconditions.checkState(!committed.get());
-
- Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
- if (prior == null || !coalesce(prior, op)) {
- transaction.addToOps(op);
- }
- }
-
- /**
- * Tries to coalesce a new op into the prior to compact the binary representation and increase
- * batching.
- *
- * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
- * need not be</p>
- *
- * @param prior The previous op.
- * @param next The next op to be added.
- * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
- */
- private boolean coalesce(Op prior, Op next) {
- if (!prior.isSet() && !next.isSet()) {
- return false;
- }
-
- Op._Fields priorType = prior.getSetField();
- if (!priorType.equals(next.getSetField())) {
- return false;
- }
-
- switch (priorType) {
- case SAVE_FRAMEWORK_ID:
- prior.setSaveFrameworkId(next.getSaveFrameworkId());
- return true;
-
- case SAVE_ACCEPTED_JOB:
- case REMOVE_JOB:
- case SAVE_QUOTA:
- case REMOVE_QUOTA:
- return false;
-
- case SAVE_TASKS:
- coalesce(prior.getSaveTasks(), next.getSaveTasks());
- return true;
- case REMOVE_TASKS:
- coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
- return true;
- case SAVE_HOST_ATTRIBUTES:
- return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
- default:
- LOG.warning("Unoptimized op: " + priorType);
- return false;
- }
- }
-
- private void coalesce(SaveTasks prior, SaveTasks next) {
- if (next.isSetTasks()) {
- if (prior.isSetTasks()) {
- // It is an expected invariant that an operation may reference a task (identified by
- // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
- // the most recent task definition overrides the prior operation.
- Map<String, ScheduledTask> coalesced = Maps.newHashMap();
- for (ScheduledTask task : prior.getTasks()) {
- coalesced.put(task.getAssignedTask().getTaskId(), task);
- }
- for (ScheduledTask task : next.getTasks()) {
- coalesced.put(task.getAssignedTask().getTaskId(), task);
- }
- prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
- } else {
- prior.setTasks(next.getTasks());
- }
- }
- }
-
- private void coalesce(RemoveTasks prior, RemoveTasks next) {
- if (next.isSetTaskIds()) {
- if (prior.isSetTaskIds()) {
- prior.setTaskIds(ImmutableSet.<String>builder()
- .addAll(prior.getTaskIds())
- .addAll(next.getTaskIds())
- .build());
- } else {
- prior.setTaskIds(next.getTaskIds());
- }
- }
- }
-
- private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
- if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
- prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
- return true;
- }
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
deleted file mode 100644
index 74f06aa..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.RemoveJob;
-import com.twitter.aurora.gen.storage.RemoveLock;
-import com.twitter.aurora.gen.storage.RemoveQuota;
-import com.twitter.aurora.gen.storage.RemoveTasks;
-import com.twitter.aurora.gen.storage.RewriteTask;
-import com.twitter.aurora.gen.storage.SaveAcceptedJob;
-import com.twitter.aurora.gen.storage.SaveFrameworkId;
-import com.twitter.aurora.gen.storage.SaveHostAttributes;
-import com.twitter.aurora.gen.storage.SaveLock;
-import com.twitter.aurora.gen.storage.SaveQuota;
-import com.twitter.aurora.gen.storage.SaveTasks;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
-import com.twitter.aurora.scheduler.storage.ForwardingStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager;
-import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager.StreamTransaction;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Closure;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A storage implementation that ensures committed transactions are written to a log.
- *
- * <p>In the classic write-ahead log usage we'd perform mutations as follows:
- * <ol>
- * <li>write op to log</li>
- * <li>perform op locally</li>
- * <li>*checkpoint</li>
- * </ol>
- *
- * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
- * have a record of our mutation in case we should need to recover state later after a crash or on
- * a new host (assuming the log is distributed). We then apply the mutation to a local (in-memory)
- * data structure for serving fast read requests and then optionally write down the position of the
- * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
- * crash. Instead of reading the whole log, we can read all entries past the checkpoint. This
- * design implies that all mutations must be idempotent and free from constraint and thus
- * replayable over newer operations when recovering from an old checkpoint.
- *
- * <p>The important detail in our case is the possibility of writing an op to the log, and then
- * failing to commit locally since we use a local database instead of an in-memory data structure.
- * If we die after such a failure, then another instance can read and apply the logged op
- * erroneously.
- *
- * <p>This implementation leverages a local transaction to handle this:
- * <ol>
- * <li>start local transaction</li>
- * <li>perform op locally (uncommitted!)</li>
- * <li>write op to log</li>
- * <li>commit local transaction</li>
- * <li>*checkpoint</li>
- * </ol>
- *
- * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
- * fails to apply to the log, it'll throw and abort the local storage transaction as well.
- */
-public class LogStorage extends ForwardingStore
- implements NonVolatileStorage, DistributedSnapshotStore {
-
- /**
- * A service that can schedule an action to be executed periodically.
- */
- @VisibleForTesting
- interface SchedulingService {
-
- /**
- * Schedules an action to execute periodically.
- *
- * @param interval The time period to wait until running the {@code action} again.
- * @param action The action to execute periodically.
- */
- void doEvery(Amount<Long, Time> interval, Runnable action);
- }
-
- private static class ScheduledExecutorSchedulingService implements SchedulingService {
- private final ScheduledExecutorService scheduledExecutor;
-
- ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
- Amount<Long, Time> shutdownGracePeriod) {
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- shutdownRegistry.addAction(
- new ExecutorServiceShutdown(scheduledExecutor, shutdownGracePeriod));
- }
-
- @Override
- public void doEvery(Amount<Long, Time> interval, Runnable action) {
- checkNotNull(interval);
- checkNotNull(action);
-
- long delay = interval.getValue();
- TimeUnit timeUnit = interval.getUnit().getTimeUnit();
- scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
- }
- }
-
- private static final Logger LOG = Logger.getLogger(LogStorage.class.getName());
-
- private final LogManager logManager;
- private final SchedulingService schedulingService;
- private final SnapshotStore<Snapshot> snapshotStore;
- private final Amount<Long, Time> snapshotInterval;
-
- private StreamManager streamManager;
-
- private boolean recovered = false;
- private StreamTransaction transaction = null;
-
- private final MutableStoreProvider logStoreProvider = new MutableStoreProvider() {
- @Override public SchedulerStore.Mutable getSchedulerStore() {
- return LogStorage.this;
- }
-
- @Override public JobStore.Mutable getJobStore() {
- return LogStorage.this;
- }
-
- @Override public TaskStore getTaskStore() {
- return LogStorage.this;
- }
-
- @Override public TaskStore.Mutable getUnsafeTaskStore() {
- return LogStorage.this;
- }
-
- @Override public LockStore.Mutable getLockStore() {
- return LogStorage.this;
- }
-
- @Override public QuotaStore.Mutable getQuotaStore() {
- return LogStorage.this;
- }
-
- @Override public AttributeStore.Mutable getAttributeStore() {
- return LogStorage.this;
- }
- };
-
- /**
- * Identifies the grace period to give in-process snapshots and checkpoints to complete during
- * shutdown.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface ShutdownGracePeriod { }
-
- /**
- * Identifies the interval between snapshots of local storage truncating the log.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface SnapshotInterval { }
-
- /**
- * Identifies a local storage layer that is written to only after first ensuring the write
- * operation is persisted in the log.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface WriteBehind { }
-
- @Inject
- LogStorage(LogManager logManager,
- ShutdownRegistry shutdownRegistry,
- @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
- SnapshotStore<Snapshot> snapshotStore,
- @SnapshotInterval Amount<Long, Time> snapshotInterval,
- @WriteBehind Storage storage,
- @WriteBehind SchedulerStore.Mutable schedulerStore,
- @WriteBehind JobStore.Mutable jobStore,
- @WriteBehind TaskStore.Mutable taskStore,
- @WriteBehind LockStore.Mutable lockStore,
- @WriteBehind QuotaStore.Mutable quotaStore,
- @WriteBehind AttributeStore.Mutable attributeStore) {
-
- this(logManager,
- new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
- snapshotStore,
- snapshotInterval,
- storage,
- schedulerStore,
- jobStore,
- taskStore,
- lockStore,
- quotaStore,
- attributeStore);
- }
-
- @VisibleForTesting
- LogStorage(LogManager logManager,
- SchedulingService schedulingService,
- SnapshotStore<Snapshot> snapshotStore,
- Amount<Long, Time> snapshotInterval,
- Storage storage,
- SchedulerStore.Mutable schedulerStore,
- JobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- LockStore.Mutable lockStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore) {
-
- super(storage, schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
- this.logManager = checkNotNull(logManager);
- this.schedulingService = checkNotNull(schedulingService);
- this.snapshotStore = checkNotNull(snapshotStore);
- this.snapshotInterval = checkNotNull(snapshotInterval);
- }
-
- @Override
- public synchronized void prepare() {
- // Open the log to make a log replica available to the scheduler group.
- try {
- streamManager = logManager.open();
- } catch (IOException e) {
- throw new IllegalStateException("Failed to open the log, cannot continue", e);
- }
-
- // TODO(John Sirois): start incremental recovery here from the log and do a final recovery
- // catchup in start after shutting down the incremental syncer.
- }
-
- @Override
- public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- // Must have the underlying storage started so we can query it for the last checkpoint.
- // We replay these entries in the forwarded storage system's transactions but not ours - we
- // do not want to re-record these ops to the log.
- recover();
- recovered = true;
-
- // Now that we're recovered we should let any mutations done in initializationLogic append
- // to the log, so run it in one of our transactions.
- write(initializationLogic);
- }
- });
-
- scheduleSnapshots();
- }
-
- @Override
- public void stop() {
- // No-op.
- }
-
- @Timed("scheduler_log_recover")
- void recover() throws RecoveryFailedException {
- try {
- streamManager.readFromBeginning(new Closure<LogEntry>() {
- @Override public void execute(LogEntry logEntry) {
- replay(logEntry);
- }
- });
- } catch (CodingException | InvalidPositionException | StreamAccessException e) {
- throw new RecoveryFailedException(e);
- }
- }
-
- private static final class RecoveryFailedException extends SchedulerException {
- private RecoveryFailedException(Throwable cause) {
- super(cause);
- }
- }
-
- void replay(final LogEntry logEntry) {
- switch (logEntry.getSetField()) {
- case SNAPSHOT:
- Snapshot snapshot = logEntry.getSnapshot();
- LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
- snapshotStore.applySnapshot(snapshot);
- break;
-
- case TRANSACTION:
- for (Op op : logEntry.getTransaction().getOps()) {
- replayOp(op);
- }
- break;
-
- case NOOP:
- // Nothing to do here
- break;
-
- case DEFLATED_ENTRY:
- throw new IllegalArgumentException("Deflated entries are not handled at this layer.");
-
- case FRAME:
- throw new IllegalArgumentException("Framed entries are not handled at this layer.");
-
- default:
- throw new IllegalStateException("Unknown log entry type: " + logEntry);
- }
- }
-
- private void replayOp(Op op) {
- switch (op.getSetField()) {
- case SAVE_FRAMEWORK_ID:
- saveFrameworkId(op.getSaveFrameworkId().getId());
- break;
-
- case SAVE_ACCEPTED_JOB:
- SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
- saveAcceptedJob(
- acceptedJob.getManagerId(),
- IJobConfiguration.build(acceptedJob.getJobConfig()));
- break;
-
- case REMOVE_JOB:
- removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
- break;
-
- case SAVE_TASKS:
- saveTasks(IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
- break;
-
- case REWRITE_TASK:
- RewriteTask rewriteTask = op.getRewriteTask();
- unsafeModifyInPlace(rewriteTask.getTaskId(), ITaskConfig.build(rewriteTask.getTask()));
- break;
-
- case REMOVE_TASKS:
- deleteTasks(op.getRemoveTasks().getTaskIds());
- break;
-
- case SAVE_QUOTA:
- SaveQuota saveQuota = op.getSaveQuota();
- saveQuota(saveQuota.getRole(), IQuota.build(saveQuota.getQuota()));
- break;
-
- case REMOVE_QUOTA:
- removeQuota(op.getRemoveQuota().getRole());
- break;
-
- case SAVE_HOST_ATTRIBUTES:
- saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
- break;
-
- case SAVE_LOCK:
- saveLock(ILock.build(op.getSaveLock().getLock()));
- break;
-
- case REMOVE_LOCK:
- removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
- break;
-
- default:
- throw new IllegalStateException("Unknown transaction op: " + op);
- }
- }
-
- private void scheduleSnapshots() {
- if (snapshotInterval.getValue() > 0) {
- schedulingService.doEvery(snapshotInterval, new Runnable() {
- @Override public void run() {
- try {
- snapshot();
- } catch (StorageException e) {
- if (e.getCause() != null) {
- LOG.log(Level.WARNING, e.getMessage(), e.getCause());
- } else {
- LOG.log(Level.WARNING, "StorageException when attempting to snapshot.", e);
- }
- }
- }
- });
- }
- }
-
- /**
- * Forces a snapshot of the storage state.
- *
- * @throws CodingException If there is a problem encoding the snapshot.
- * @throws InvalidPositionException If the log stream cursor is invalid.
- * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
- */
- @Timed("scheduler_log_snapshot")
- void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
- super.write(new MutateWork.NoResult<CodingException>() {
- @Override protected void execute(MutableStoreProvider unused)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- persist(snapshotStore.createSnapshot());
- }
- });
- }
-
- @Timed("scheduler_log_snapshot_persist")
- @Override
- public void persist(Snapshot snapshot)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- streamManager.snapshot(snapshot);
- }
-
- @Override
- public synchronized <T, E extends Exception> T write(final MutateWork<T, E> work)
- throws StorageException, E {
-
- // We don't want to use the log when recovering from it, we just want to update the underlying
- // store - so pass mutations straight through to the underlying storage.
- if (!recovered) {
- return super.write(work);
- }
-
- // The log stream transaction has already been set up so we just need to delegate with our
- // store provider so any mutations performed by work get logged.
- if (transaction != null) {
- return super.write(new MutateWork<T, E>() {
- @Override public T apply(MutableStoreProvider unused) throws E {
- return work.apply(logStoreProvider);
- }
- });
- }
-
- transaction = streamManager.startTransaction();
- try {
- return super.write(new MutateWork<T, E>() {
- @Override public T apply(MutableStoreProvider unused) throws E {
- T result = work.apply(logStoreProvider);
- try {
- transaction.commit();
- } catch (CodingException e) {
- throw new IllegalStateException(
- "Problem encoding transaction operations to the log stream", e);
- } catch (StreamAccessException e) {
- throw new StorageException(
- "There was a problem committing the transaction to the log.", e);
- }
- return result;
- }
- });
- } finally {
- transaction = null;
- }
- }
-
- @Timed("scheduler_log_save_framework_id")
- @Override
- public void saveFrameworkId(final String frameworkId) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
- LogStorage.super.saveFrameworkId(frameworkId);
- }
- });
- }
-
- @Timed("scheduler_log_job_save")
- @Override
- public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
- LogStorage.super.saveAcceptedJob(managerId, jobConfig);
- }
- });
- }
-
- @Timed("scheduler_log_job_remove")
- @Override
- public void removeJob(final IJobKey jobKey) {
- checkNotNull(jobKey);
-
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
- LogStorage.super.removeJob(jobKey);
- }
- });
- }
-
- @Timed("scheduler_log_tasks_save")
- @Override
- public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
- LogStorage.super.saveTasks(newTasks);
- }
- });
- }
-
- @Override
- public void deleteAllTasks() {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- Query.Builder query = Query.unscoped();
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- deleteTasks(ids);
- }
- });
- }
-
- @Timed("scheduler_log_tasks_remove")
- @Override
- public void deleteTasks(final Set<String> taskIds) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.removeTasks(new RemoveTasks(taskIds)));
- LogStorage.super.deleteTasks(taskIds);
- }
- });
- }
-
- @Timed("scheduler_log_tasks_mutate")
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- final Query.Builder query,
- final Function<IScheduledTask, IScheduledTask> mutator) {
-
- return write(new MutateWork.Quiet<ImmutableSet<IScheduledTask>>() {
- @Override public ImmutableSet<IScheduledTask> apply(MutableStoreProvider unused) {
- ImmutableSet<IScheduledTask> mutated = LogStorage.super.mutateTasks(query, mutator);
-
- Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Storing updated tasks to log: "
- + Maps.transformValues(tasksById, Tasks.GET_STATUS));
- }
-
- // TODO(William Farner): Avoid writing an op when mutated is empty.
- log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
- return mutated;
- }
- });
- }
-
- @Timed("scheduler_log_unsafe_modify_in_place")
- @Override
- public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
- return write(new MutateWork.Quiet<Boolean>() {
- @Override public Boolean apply(MutableStoreProvider storeProvider) {
- boolean mutated = LogStorage.super.unsafeModifyInPlace(taskId, taskConfiguration);
- if (mutated) {
- log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
- }
- return mutated;
- }
- });
- }
-
- @Timed("scheduler_log_quota_remove")
- @Override
- public void removeQuota(final String role) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.removeQuota(new RemoveQuota(role)));
- LogStorage.super.removeQuota(role);
- }
- });
- }
-
- @Timed("scheduler_log_quota_save")
- @Override
- public void saveQuota(final String role, final IQuota quota) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
- LogStorage.super.saveQuota(role, quota);
- }
- });
- }
-
- @Timed("scheduler_save_host_attribute")
- @Override
- public void saveHostAttributes(final HostAttributes attrs) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- // Pass the updated attributes upstream, and then check if the stored value changes.
- // We do this since different parts of the system write partial HostAttributes objects
- // and they are merged together internally.
- // TODO(William Farner): Split out a separate method
- // saveAttributes(String host, Iterable<Attributes>) to simplify this.
- Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(attrs.getHost());
- LogStorage.super.saveHostAttributes(attrs);
- Optional<HostAttributes> updated = LogStorage.super.getHostAttributes(attrs.getHost());
- if (!saved.equals(updated)) {
- log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
- }
- }
- });
- }
-
- @Timed("scheduler_lock_save")
- @Override
- public void saveLock(final ILock lock) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.saveLock(new SaveLock(lock.newBuilder())));
- LogStorage.super.saveLock(lock);
- }
- });
- }
-
- @Timed("scheduler_lock_remove")
- @Override
- public void removeLock(final ILockKey lockKey) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
- LogStorage.super.removeLock(lockKey);
- }
- });
- }
-
- @Override
- public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
- write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider unused) {
- Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(host);
- if (saved.isPresent()) {
- HostAttributes attributes = saved.get().setMode(mode);
- log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
- LogStorage.super.saveHostAttributes(attributes);
- }
- }
- });
- return false;
- }
-
- @Override
- public void snapshot() throws StorageException {
- try {
- doSnapshot();
- } catch (CodingException e) {
- throw new StorageException("Failed to encode a snapshot", e);
- } catch (InvalidPositionException e) {
- throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
- } catch (StreamAccessException e) {
- throw new StorageException("Failed to create a snapshot", e);
- }
- super.snapshot();
- }
-
- private void log(Op op) {
- if (recovered) {
- transaction.add(op);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
deleted file mode 100644
index 92568c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.lang.annotation.Annotation;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.scheduler.log.Log;
-import com.twitter.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
-import com.twitter.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import com.twitter.aurora.scheduler.storage.log.LogManager.SnapshotSetting;
-import com.twitter.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
-import com.twitter.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-/**
- * Bindings for scheduler distributed log based storage.
- * <p/>
- * Requires bindings for:
- * <ul>
- * <li>{@link Clock}</li>
- * <li>{@link ShutdownRegistry}</li>
- * <li>The concrete {@link Log} implementation.</li>
- * </ul>
- * <p/>
- */
-public class LogStorageModule extends AbstractModule {
-
- @CmdLine(name = "dlog_shutdown_grace_period",
- help = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
- + "actions to complete before forcibly shutting down.")
- private static final Arg<Amount<Long, Time>> SHUTDOWN_GRACE_PERIOD =
- Arg.create(Amount.of(2L, Time.SECONDS));
-
- @CmdLine(name = "dlog_snapshot_interval",
- help = "Specifies the frequency at which snapshots of local storage are taken and "
- + "written to the log.")
- private static final Arg<Amount<Long, Time>> SNAPSHOT_INTERVAL =
- Arg.create(Amount.of(1L, Time.HOURS));
-
- @CmdLine(name = "dlog_max_entry_size",
- help = "Specifies the maximum entry size to append to the log. Larger entries will be "
- + "split across entry Frames.")
- @VisibleForTesting
- public static final Arg<Amount<Integer, Data>> MAX_LOG_ENTRY_SIZE =
- Arg.create(Amount.of(512, Data.KB));
-
- @CmdLine(name = "deflate_snapshots", help = "Whether snapshots should be deflate-compressed.")
- private static final Arg<Boolean> DEFLATE_SNAPSHOTS = Arg.create(true);
-
- @Override
- protected void configure() {
- requireBinding(Log.class);
- requireBinding(Clock.class);
- requireBinding(ShutdownRegistry.class);
-
- bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
- bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
-
- bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
- .toInstance(MAX_LOG_ENTRY_SIZE.get());
- bind(LogManager.class).in(Singleton.class);
- bind(Boolean.class).annotatedWith(SnapshotSetting.class).toInstance(DEFLATE_SNAPSHOTS.get());
-
- bind(LogStorage.class).in(Singleton.class);
- install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
- bind(DistributedSnapshotStore.class).to(LogStorage.class);
- }
-
- private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {
- bind(Key.get(new TypeLiteral<Amount<Long, Time>>() { }, key)).toInstance(value.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
deleted file mode 100644
index df6b899..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.Lock;
-import com.twitter.aurora.gen.storage.QuotaConfiguration;
-import com.twitter.aurora.gen.storage.SchedulerMetadata;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.StoredJob;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Volatile;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.util.BuildInfo;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-
-/**
- * Snapshot store implementation that delegates to underlying snapshot stores by
- * extracting/applying fields in a snapshot thrift struct.
- */
-public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
-
- private static final Logger LOG = Logger.getLogger(SnapshotStoreImpl.class.getName());
-
- private static final SnapshotField ATTRIBUTE_FIELD = new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
- snapshot.setHostAttributes(storeProvider.getAttributeStore().getHostAttributes());
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getAttributeStore().deleteHostAttributes();
-
- if (snapshot.isSetHostAttributes()) {
- for (HostAttributes attributes : snapshot.getHostAttributes()) {
- store.getAttributeStore().saveHostAttributes(attributes);
- }
- }
- }
- };
-
- private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
- ATTRIBUTE_FIELD,
- new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setTasks(
- IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getUnsafeTaskStore().deleteAllTasks();
-
- if (snapshot.isSetTasks()) {
- store.getUnsafeTaskStore().saveTasks(
- IScheduledTask.setFromBuilders(snapshot.getTasks()));
- }
- }
- },
- new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- ImmutableSet.Builder<StoredJob> jobs = ImmutableSet.builder();
- for (String managerId : store.getJobStore().fetchManagerIds()) {
- for (IJobConfiguration config : store.getJobStore().fetchJobs(managerId)) {
- jobs.add(new StoredJob(managerId, config.newBuilder()));
- }
- }
- snapshot.setJobs(jobs.build());
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getJobStore().deleteJobs();
-
- if (snapshot.isSetJobs()) {
- for (StoredJob job : snapshot.getJobs()) {
- store.getJobStore().saveAcceptedJob(
- job.getJobManagerId(),
- IJobConfiguration.build(job.getJobConfiguration()));
- }
- }
- }
- },
- new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- Properties props = new BuildInfo().getProperties();
-
- snapshot.setSchedulerMetadata(
- new SchedulerMetadata()
- .setFrameworkId(store.getSchedulerStore().fetchFrameworkId())
- .setRevision(props.getProperty(BuildInfo.Key.GIT_REVISION.value))
- .setTag(props.getProperty(BuildInfo.Key.GIT_TAG.value))
- .setTimestamp(props.getProperty(BuildInfo.Key.TIMESTAMP.value))
- .setUser(props.getProperty(BuildInfo.Key.USER.value))
- .setMachine(props.getProperty(BuildInfo.Key.MACHINE.value))
- .setVersion(CURRENT_API_VERSION));
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- if (snapshot.isSetSchedulerMetadata()) {
- // No delete necessary here since this is a single value.
-
- store.getSchedulerStore()
- .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId());
- }
- }
- },
- new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
- for (Map.Entry<String, IQuota> entry : store.getQuotaStore().fetchQuotas().entrySet()) {
- quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
- }
-
- snapshot.setQuotaConfigurations(quotas.build());
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getQuotaStore().deleteQuotas();
-
- if (snapshot.isSetQuotaConfigurations()) {
- for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) {
- store.getQuotaStore().saveQuota(quota.getRole(), IQuota.build(quota.getQuota()));
- }
- }
- }
- },
- new SnapshotField() {
- @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
- }
-
- @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getLockStore().deleteLocks();
-
- if (snapshot.isSetLocks()) {
- for (Lock lock : snapshot.getLocks()) {
- store.getLockStore().saveLock(ILock.build(lock));
- }
- }
- }
- }
- );
-
- private final Clock clock;
- private final Storage storage;
-
- @Inject
- public SnapshotStoreImpl(Clock clock, @Volatile Storage storage) {
- this.clock = checkNotNull(clock);
- this.storage = checkNotNull(storage);
- }
-
- @Timed("snapshot_create")
- @Override public Snapshot createSnapshot() {
- return storage.consistentRead(new Work.Quiet<Snapshot>() {
- @Override public Snapshot apply(StoreProvider storeProvider) {
- Snapshot snapshot = new Snapshot();
-
- // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
- // one of the field closures is mean and tries to apply a timestamp.
- long timestamp = clock.nowMillis();
- for (SnapshotField field : SNAPSHOT_FIELDS) {
- field.saveToSnapshot(storeProvider, snapshot);
- }
- snapshot.setTimestamp(timestamp);
- return snapshot;
- }
- });
- }
-
- @Timed("snapshot_apply")
- @Override public void applySnapshot(final Snapshot snapshot) {
- checkNotNull(snapshot);
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- LOG.info("Restoring snapshot.");
-
- for (SnapshotField field : SNAPSHOT_FIELDS) {
- field.restoreFromSnapshot(storeProvider, snapshot);
- }
- }
- });
- }
-
- private interface SnapshotField {
- void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
-
- void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
deleted file mode 100644
index a4c0126..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log.testing;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.easymock.IExpectationSetters;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.Transaction;
-import com.twitter.aurora.gen.storage.storageConstants;
-import com.twitter.aurora.scheduler.log.Log.Position;
-import com.twitter.aurora.scheduler.log.Log.Stream;
-
-import static org.easymock.EasyMock.expect;
-
-/**
- * A junit argument matcher that detects same-value {@link LogEntry} objects in a more human
- * readable way than byte array comparison.
- */
-public class LogOpMatcher implements IArgumentMatcher {
- private final LogEntry expected;
-
- public LogOpMatcher(LogEntry expected) {
- this.expected = expected;
- }
-
- @Override
- public boolean matches(Object argument) {
- try {
- return expected.equals(ThriftBinaryCodec.decodeNonNull(LogEntry.class, (byte[]) argument));
- } catch (CodingException e) {
- return false;
- }
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append(expected);
- }
-
- /**
- * Creates a stream matcher that will set expectations on the provided {@code stream}.
- *
- * @param stream Mocked stream.
- * @return A stream matcher to set expectations against {@code stream}.
- */
- public static StreamMatcher matcherFor(Stream stream) {
- return new StreamMatcher(stream);
- }
-
- public static final class StreamMatcher {
- private final Stream stream;
-
- private StreamMatcher(Stream stream) {
- this.stream = Preconditions.checkNotNull(stream);
- }
-
- /**
- * Sets an expectation for a stream transaction containing the provided {@code ops}.
- *
- * @param ops Operations to expect in the transaction.
- * @return An expectation setter.
- */
- public IExpectationSetters<Position> expectTransaction(Op...ops) {
- LogEntry entry = LogEntry.transaction(
- new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
- return expect(stream.append(sameEntry(entry)));
- }
-
- /**
- * Sets an expectation for a snapshot.
- *
- * @param snapshot Expected snapshot.
- * @return An expectation setter.
- */
- public IExpectationSetters<Position> expectSnapshot(Snapshot snapshot) {
- LogEntry entry = LogEntry.snapshot(snapshot);
- return expect(stream.append(sameEntry(entry)));
- }
- }
-
- /**
- * Creates a matcher that supports value matching between a serialized {@link LogEntry} byte array
- * and a log entry object.
- *
- * @param entry Entry to match against.
- * @return {@code null}, return value included for easymock-style embedding.
- */
- private static byte[] sameEntry(LogEntry entry) {
- EasyMock.reportMatcher(new LogOpMatcher(entry));
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
deleted file mode 100644
index bf05caa..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * An interning pool that can be used to retrieve the canonical instances of objects, while
- * maintaining a reference count to the canonical instances.
- *
- * @param <T> The interned object type.
- * @param <A> The type used for maintaining associations.
- */
-class Interner<T, A> {
-
- private final Map<T, InternEntry<A, T>> pool = Maps.newHashMap();
-
- /**
- * Retrieves the canonical instance of {@code t} and maintains {@code association} with the
- * interned value. If {@code t} was not previously interned, the provided instance is stored.
- *
- * @param t The object to intern, or get the previously-interned value for.
- * @param association A value to associate with {@code t}.
- * @return The interned value, which may be reference-equivalent to {@code t}.
- */
- synchronized T addAssociation(T t, A association) {
- InternEntry<A, T> entry = pool.get(t);
- if (entry == null) {
- entry = new InternEntry<>(t, association);
- pool.put(t, entry);
- } else {
- entry.associations.add(association);
- }
- return entry.interned;
- }
-
- /**
- * Removes an association with an interned value, effectively decrementing the reference count.
- *
- * @param t The interned value that {@code association} was associated with.
- * @param association The association to remove.
- */
- synchronized void removeAssociation(T t, A association) {
- InternEntry<A, T> entry = pool.get(t);
- if (entry != null) {
- entry.associations.remove(association);
- if (entry.associations.isEmpty()) {
- pool.remove(t);
- }
- }
- }
-
- /**
- * Removes all interned values and associations.
- */
- synchronized void clear() {
- pool.clear();
- }
-
- @VisibleForTesting
- synchronized boolean isInterned(T t) {
- return pool.containsKey(t);
- }
-
- @VisibleForTesting
- synchronized Set<A> getAssociations(T t) {
- return ImmutableSet.copyOf(pool.get(t).associations);
- }
-
- private static class InternEntry<A, T> {
- private final T interned;
- private final Set<A> associations = Sets.newHashSet();
-
- InternEntry(T interned, A initialAssociation) {
- this.interned = interned;
- associations.add(initialAssociation);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
deleted file mode 100644
index 6c383c7..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.storage.AttributeStore.Mutable;
-
-/**
- * An in-memory attribute store.
- */
-class MemAttributeStore implements Mutable {
- private final ConcurrentMap<String, HostAttributes> hostAttributes = Maps.newConcurrentMap();
-
- @Override
- public void deleteHostAttributes() {
- hostAttributes.clear();
- }
-
- @Override
- public void saveHostAttributes(HostAttributes attributes) {
- hostAttributes.putIfAbsent(attributes.getHost(), attributes);
-
- HostAttributes stored = hostAttributes.get(attributes.getHost());
- if (!stored.isSetMode()) {
- stored.setMode(attributes.isSetMode() ? attributes.getMode() : MaintenanceMode.NONE);
- }
- stored.setAttributes(attributes.isSetAttributes()
- ? attributes.getAttributes() : ImmutableSet.<Attribute>of());
- }
-
- @Override
- public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
- HostAttributes stored = hostAttributes.get(host);
- if (stored != null) {
- stored.setMode(mode);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public Optional<HostAttributes> getHostAttributes(String host) {
- return Optional.fromNullable(hostAttributes.get(host));
- }
-
- @Override
- public Set<HostAttributes> getHostAttributes() {
- return ImmutableSet.copyOf(hostAttributes.values());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
deleted file mode 100644
index 8268e8d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory job store.
- */
-class MemJobStore implements JobStore.Mutable {
-
- private final LoadingCache<String, Manager> managers = CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Manager>() {
- @Override public Manager load(String key) {
- return new Manager();
- }
- });
-
- @Override
- public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
- checkNotNull(managerId);
- checkNotNull(jobConfig);
-
- IJobKey key = JobKeys.assertValid(jobConfig.getKey());
- managers.getUnchecked(managerId).jobs.put(key, jobConfig);
- }
-
- @Override
- public void removeJob(IJobKey jobKey) {
- checkNotNull(jobKey);
-
- for (Manager manager : managers.asMap().values()) {
- manager.jobs.remove(jobKey);
- }
- }
-
- @Override
- public void deleteJobs() {
- managers.invalidateAll();
- }
-
- @Override
- public Iterable<IJobConfiguration> fetchJobs(String managerId) {
- checkNotNull(managerId);
-
- @Nullable Manager manager = managers.getIfPresent(managerId);
- if (manager == null) {
- return ImmutableSet.of();
- }
-
- synchronized (manager.jobs) {
- return ImmutableSet.copyOf(manager.jobs.values());
- }
- }
-
- @Override
- public Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey) {
- checkNotNull(managerId);
- checkNotNull(jobKey);
-
- Optional<Manager> manager = Optional.fromNullable(managers.getIfPresent(managerId));
- if (!manager.isPresent()) {
- return Optional.absent();
- } else {
- return Optional.fromNullable(manager.get().jobs.get(jobKey));
- }
- }
-
- @Override
- public Set<String> fetchManagerIds() {
- return ImmutableSet.copyOf(managers.asMap().keySet());
- }
-
- private static class Manager {
- private final Map<IJobKey, IJobConfiguration> jobs = Maps.newConcurrentMap();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
deleted file mode 100644
index 1ced973..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-
-/**
- * An in-memory lock store.
- */
-class MemLockStore implements LockStore.Mutable {
-
- private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
-
- @Override
- public void saveLock(ILock lock) {
- locks.put(lock.getKey(), lock);
- }
-
- @Override
- public void removeLock(ILockKey lockKey) {
- locks.remove(lockKey);
- }
-
- @Override
- public void deleteLocks() {
- locks.clear();
- }
-
- @Override
- public Set<ILock> fetchLocks() {
- return ImmutableSet.copyOf(locks.values());
- }
-
- @Override
- public Optional<ILock> fetchLock(ILockKey lockKey) {
- return Optional.fromNullable(locks.get(lockKey));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
deleted file mode 100644
index 855b39c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory quota store.
- */
-class MemQuotaStore implements QuotaStore.Mutable {
-
- private final Map<String, IQuota> quotas = Maps.newConcurrentMap();
-
- @Override
- public void deleteQuotas() {
- quotas.clear();
- }
-
- @Override
- public void removeQuota(String role) {
- checkNotNull(role);
-
- quotas.remove(role);
- }
-
- @Override
- public void saveQuota(String role, IQuota quota) {
- checkNotNull(role);
- checkNotNull(quota);
-
- quotas.put(role, quota);
- }
-
- @Override
- public Optional<IQuota> fetchQuota(String role) {
- checkNotNull(role);
- return Optional.fromNullable(quotas.get(role));
- }
-
- @Override
- public Map<String, IQuota> fetchQuotas() {
- return ImmutableMap.copyOf(quotas);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
deleted file mode 100644
index 19bf4ec..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Atomics;
-
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-
-/**
- * An in-memory scheduler store.
- */
-class MemSchedulerStore implements SchedulerStore.Mutable {
- private final AtomicReference<String> frameworkId = Atomics.newReference();
-
- @Override
- public void saveFrameworkId(String newFrameworkId) {
- frameworkId.set(newFrameworkId);
- }
-
- @Nullable
- @Override
- public String fetchFrameworkId() {
- return frameworkId.get();
- }
-}