You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/01/13 16:48:44 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6563

Repository: activemq
Updated Branches:
  refs/heads/master 8f960ea35 -> 281d600ae


https://issues.apache.org/jira/browse/AMQ-6563

ensure that the lock is always released in the load method.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/281d600a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/281d600a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/281d600a

Branch: refs/heads/master
Commit: 281d600ae2f9ba6c6bc7bee0e8025698b9a76563
Parents: 8f960ea
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 13 11:48:19 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 13 11:48:19 2017 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 126 ++++++++++---------
 1 file changed, 64 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/281d600a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index c46a127..45d4220 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -148,13 +148,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         protected Location producerSequenceIdTrackerLocation = null;
         protected Location ackMessageFileMapLocation = null;
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
-        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
         protected int version = VERSION;
         protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
 
         public void read(DataInput is) throws IOException {
             state = is.readInt();
-            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
+            destinations = new BTreeIndex<>(pageFile, is.readLong());
             if (is.readBoolean()) {
                 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
             } else {
@@ -317,7 +317,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                         page.set(metadata);
                         metadata.page = page;
                         metadata.state = CLOSED_STATE;
-                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
+                        metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId());
 
                         tx.store(metadata.page, metadataMarshaller, true);
                     } else {
@@ -468,8 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     public void load() throws IOException {
         this.indexLock.writeLock().lock();
-        IOHelper.mkdirs(directory);
         try {
+            IOHelper.mkdirs(directory);
             if (deleteAllMessages) {
                 getJournal().start();
                 getJournal().delete();
@@ -488,7 +488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     }
 
     public void close() throws IOException, InterruptedException {
-        if( opened.compareAndSet(true, false)) {
+        if (opened.compareAndSet(true, false)) {
             checkpointLock.writeLock().lock();
             try {
                 if (metadata.page != null) {
@@ -577,7 +577,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             int add;
             int remove;
         }
-        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
+        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>();
 
         @SuppressWarnings("rawtypes")
         public void track(Operation operation) {
@@ -620,7 +620,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     @SuppressWarnings("rawtypes")
     public String getTransactions() {
 
-        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
+        ArrayList<TranInfo> infos = new ArrayList<>();
         synchronized (inflightTransactions) {
             if (!inflightTransactions.isEmpty()) {
                 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
@@ -716,8 +716,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             });
 
             // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
-            Set<TransactionId> toRollback = new HashSet<TransactionId>();
-            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
+            Set<TransactionId> toRollback = new HashSet<>();
+            Set<TransactionId> toDiscard = new HashSet<>();
             synchronized (inflightTransactions) {
                 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
                     TransactionId id = it.next();
@@ -824,7 +824,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         for (String key : storedDestinations.keySet()) {
             StoredDestination sd = storedDestinations.get(key);
 
-            final ArrayList<Long> matches = new ArrayList<Long>();
+            final ArrayList<Long> matches = new ArrayList<>();
             // Find all the Locations that are >= than the last Append Location.
             sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
                 @Override
@@ -891,7 +891,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
             });
         }
-        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
+        HashSet<Integer> missingJournalFiles = new HashSet<>();
         while (!ss.isEmpty()) {
             missingJournalFiles.add((int) ss.removeFirst());
         }
@@ -909,8 +909,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             LOG.warn("Some journal files are missing: " + missingJournalFiles);
         }
 
-        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
-        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
+        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
+        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
         for (Integer missing : missingJournalFiles) {
             missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
         }
@@ -924,7 +924,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
                 while (seq != null) {
                     BTreeVisitor.BetweenVisitor<Location, Long> visitor =
-                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
+                        new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
                     missingPredicates.add(visitor);
                     knownCorruption.add(visitor);
                     seq = seq.getNext();
@@ -935,7 +935,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         if (!missingPredicates.isEmpty()) {
             for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
                 final StoredDestination sd = sdEntry.getValue();
-                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
+                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
                 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
                     @Override
                     protected void matched(Location key, Long value) {
@@ -1413,7 +1413,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     }
 
     protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
-        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+        final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
 
         // Mark the current journal file as a compacted file so that gc checks can skip
         // over logs that are smaller compaction type logs.
@@ -1431,7 +1431,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     // /////////////////////////////////////////////////////////////////
 
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
-    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
 
     long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@@ -1588,7 +1588,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
         Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
         if (referenceFileIds == null) {
-            referenceFileIds = new HashSet<Integer>();
+            referenceFileIds = new HashSet<>();
             referenceFileIds.add(messageLocation.getDataFileId());
             metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
         } else {
@@ -1722,8 +1722,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         if (cleanup) {
 
-            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
+            final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
+            final TreeSet<Integer> gcCandidateSet = new TreeSet<>(completeFileSet);
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
@@ -1949,7 +1949,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         public void run() {
 
             int journalToAdvance = -1;
-            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
+            Set<Integer> journalLogsReferenced = new HashSet<>();
 
             //flag to know whether the ack forwarding completed without an exception
             boolean forwarded = false;
@@ -1979,7 +1979,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     // Map keys might not be sorted, find the earliest log file to forward acks
                     // from and move only those, future cycles can chip away at more as needed.
                     // We won't move files that are themselves rewritten on a previous compaction.
-                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+                    List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet());
                     Collections.sort(journalFileIds);
                     for (Integer journalFileId : journalFileIds) {
                         DataFile current = journal.getDataFileById(journalFileId);
@@ -2037,7 +2037,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
         LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
 
-        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
+        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>();
 
         try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
             KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
@@ -2076,7 +2076,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
             Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
             if (referenceFileIds == null) {
-                referenceFileIds = new HashSet<Integer>();
+                referenceFileIds = new HashSet<>();
                 referenceFileIds.addAll(entry.getValue());
                 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
             } else {
@@ -2171,7 +2171,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
-    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>();
 
     static class MessageKeys {
         final String messageId;
@@ -2280,8 +2280,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         ListIndex<String, Location> subLocations;
 
         // Transient data used to track which Messages are no longer needed.
-        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
-        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
+        final TreeMap<Long, Long> messageReferences = new TreeMap<>();
+        final HashSet<String> subscriptionCache = new LinkedHashSet<>();
 
         public void trackPendingAdd(Long seq) {
             orderIndex.trackPendingAdd(seq);
@@ -2304,26 +2304,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         @Override
         public StoredDestination readPayload(final DataInput dataIn) throws IOException {
             final StoredDestination value = new StoredDestination();
-            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
-            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
-            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
+            value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
+            value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
+            value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
 
             if (dataIn.readBoolean()) {
-                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
-                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
+                value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong());
                 if (metadata.version >= 4) {
-                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
+                    value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong());
                 } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         @Override
                         public void execute(Transaction tx) throws IOException {
-                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
+                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>();
 
                             if (metadata.version >= 3) {
                                 // migrate
                                 BTreeIndex<Long, HashSet<String>> oldAckPositions =
-                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
+                                        new BTreeIndex<>(pageFile, dataIn.readLong());
                                 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
                                 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
                                 oldAckPositions.load(tx);
@@ -2348,7 +2348,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             }
                             // Now move the pending messages to ack data into the store backed
                             // structure.
-                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
+                            value.ackPositions = new ListIndex<>(pageFile, tx.allocate());
                             value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
                             value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
                             value.ackPositions.load(tx);
@@ -2361,13 +2361,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 }
 
                 if (metadata.version >= 5) {
-                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
+                    value.subLocations = new ListIndex<>(pageFile, dataIn.readLong());
                 } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         @Override
                         public void execute(Transaction tx) throws IOException {
-                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
+                            value.subLocations = new ListIndex<>(pageFile, tx.allocate());
                             value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
                             value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
                             value.subLocations.load(tx);
@@ -2376,19 +2376,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 }
             }
             if (metadata.version >= 2) {
-                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
-                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+                value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
+                value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
             } else {
                 // upgrade
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     @Override
                     public void execute(Transaction tx) throws IOException {
-                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
                         value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
                         value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
                         value.orderIndex.lowPriorityIndex.load(tx);
 
-                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
                         value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
                         value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
                         value.orderIndex.highPriorityIndex.load(tx);
@@ -2471,14 +2471,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             // Brand new destination.. allocate indexes for it.
             rc = new StoredDestination();
             rc.orderIndex.allocate(tx);
-            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
-            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
+            rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate());
+            rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate());
 
             if (topic) {
-                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
-                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
-                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
-                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
+                rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate());
+                rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate());
+                rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
+                rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
         }
@@ -2532,7 +2532,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
             rc.subLocations.load(tx);
 
-            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
+            rc.subscriptionCursors = new HashMap<>();
 
             if (metadata.version < 3) {
 
@@ -2696,7 +2696,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
      * KahaDestination key
      */
     protected final ConcurrentMap<String, MessageStore> storeCache =
-            new ConcurrentHashMap<String, MessageStore>();
+            new ConcurrentHashMap<>();
 
     /**
      * Locate the storeMessageSize counter for this KahaDestination
@@ -2871,7 +2871,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 return;
             }
 
-            ArrayList<Long> unreferenced = new ArrayList<Long>();
+            ArrayList<Long> unreferenced = new ArrayList<>();
 
             for(Long sequenceId : sequences) {
                 Long references = sd.messageReferences.get(sequenceId);
@@ -2889,7 +2889,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
             for(Long sequenceId : unreferenced) {
                 // Find all the entries that need to get deleted.
-                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
+                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
                 sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
 
                 // Do the actual deletes.
@@ -2941,7 +2941,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 }
 
                 // Find all the entries that need to get deleted.
-                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
+                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
                 sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
 
                 // Do the actual deletes.
@@ -3001,11 +3001,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     // Transaction related implementation methods.
     // /////////////////////////////////////////////////////////////////
     @SuppressWarnings("rawtypes")
-    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
     @SuppressWarnings("rawtypes")
-    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
-    protected final Set<String> ackedAndPrepared = new HashSet<String>();
-    protected final Set<String> rolledBackAcks = new HashSet<String>();
+    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
+    protected final Set<String> ackedAndPrepared = new HashSet<>();
+    protected final Set<String> rolledBackAcks = new HashSet<>();
 
     // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
     // till then they are skipped by the store.
@@ -3201,6 +3201,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
      * @deprecated use {@link #getJournalDiskSyncStrategy} instead
      * @return
      */
+    @Deprecated
     public boolean isEnableJournalDiskSyncs() {
         return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
                 journalDiskSyncStrategy.trim().toUpperCase());
@@ -3210,6 +3211,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
      * @deprecated use {@link #setEnableJournalDiskSyncs} instead
      * @param syncWrites
      */
+    @Deprecated
     public void setEnableJournalDiskSyncs(boolean syncWrites) {
         if (syncWrites) {
             journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
@@ -3486,7 +3488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         Long lastHighKey;
         Long lastLowKey;
         byte lastGetPriority;
-        final List<Long> pendingAdditions = new LinkedList<Long>();
+        final List<Long> pendingAdditions = new LinkedList<>();
         final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
 
         MessageKeys remove(Transaction tx, Long key) throws IOException {
@@ -3513,16 +3515,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
 
         void allocate(Transaction tx) throws IOException {
-            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+            defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
             if (metadata.version >= 2) {
-                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
+                highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
             }
         }
 
         void configureLast(Transaction tx) throws IOException {
             // Figure out the next key using the last entry in the destination.
-            TreeSet<Long> orderedSet = new TreeSet<Long>();
+            TreeSet<Long> orderedSet = new TreeSet<>();
 
             addLast(orderedSet, highPriorityIndex, tx);
             addLast(orderedSet, defaultPriorityIndex, tx);