You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/07/31 17:26:13 UTC

[GitHub] ivankelly closed pull request #1574: Disallow direct access to LedgerHandle#metadata

ivankelly closed pull request #1574: Disallow direct access to LedgerHandle#metadata
URL: https://github.com/apache/bookkeeper/pull/1574
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index cd6084841c..ae0e0515d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -69,7 +69,7 @@ void initiate() {
             LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
         }
         // we need to send the request to every bookie in the ensamble
-        this.currentEnsemble = lh.metadata.currentEnsemble;
+        this.currentEnsemble = lh.getLedgerMetadata().currentEnsemble;
         this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
 
         DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index c1440b726f..538cfb71a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -430,7 +430,14 @@ public void safeOperationComplete(int rc,
                                     ensembleUpdatedCb.processResult(rc, null,
                                             null);
                                 } else {
-                                    lh.metadata = newMeta;
+                                    while (true) {
+                                        // temporary change, metadata really shouldn't be updated
+                                        // until the new metadata has been written successfully
+                                        LedgerMetadata currentMetadata = lh.getLedgerMetadata();
+                                        if (lh.setLedgerMetadata(currentMetadata, newMeta)) {
+                                            break;
+                                        }
+                                    }
                                     updateEnsembleInfo(ensembleUpdatedCb,
                                             fragmentStartId, lh, oldBookie2NewBookie);
                                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3a610dfd08..90b98840ab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -102,7 +102,7 @@
     static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
 
     final byte[] ledgerKey;
-    LedgerMetadata metadata;
+    private LedgerMetadata metadata;
     final BookKeeper bk;
     final long ledgerId;
     long lastAddPushed;
@@ -274,7 +274,9 @@ BookKeeper getBk() {
     }
 
     protected void initializeExplicitLacFlushPolicy() {
-        if (!metadata.isClosed() && !(this instanceof ReadOnlyLedgerHandle) && bk.getExplicitLacInterval() > 0) {
+        if (!getLedgerMetadata().isClosed()
+            && !(this instanceof ReadOnlyLedgerHandle)
+            && bk.getExplicitLacInterval() > 0) {
             explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
         } else {
             explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
@@ -333,13 +335,25 @@ public LedgerMetadata getLedgerMetadata() {
         return metadata;
     }
 
+    boolean setLedgerMetadata(LedgerMetadata expected, LedgerMetadata newMetadata) {
+        synchronized (this) {
+            // ensure that we only update the metadata if it is the object we expect it to be
+            if (metadata == expected) {
+                metadata = newMetadata;
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
     /**
      * Get this ledger's customMetadata map.
      *
      * @return map containing user provided customMetadata.
      */
     public Map<String, byte[]> getCustomMetadata() {
-        return metadata.getCustomMetadata();
+        return getLedgerMetadata().getCustomMetadata();
     }
 
     /**
@@ -348,7 +362,7 @@ public LedgerMetadata getLedgerMetadata() {
      * @return the count of fragments
      */
     public synchronized long getNumFragments() {
-        return metadata.getEnsembles().size();
+        return getLedgerMetadata().getEnsembles().size();
     }
 
     /**
@@ -358,7 +372,7 @@ public synchronized long getNumFragments() {
      * @return count of unique bookies
      */
     public synchronized long getNumBookies() {
-        Map<Long, ArrayList<BookieSocketAddress>> m = metadata.getEnsembles();
+        Map<Long, ArrayList<BookieSocketAddress>> m = getLedgerMetadata().getEnsembles();
         Set<BookieSocketAddress> s = Sets.newHashSet();
         for (ArrayList<BookieSocketAddress> aList : m.values()) {
             s.addAll(aList);
@@ -402,7 +416,7 @@ public synchronized long getLength() {
      * @return the ledger creation time
      */
     public long getCtime() {
-        return this.metadata.getCtime();
+        return getLedgerMetadata().getCtime();
     }
 
     /**
@@ -425,10 +439,10 @@ BookiesHealthInfo getBookiesHealthInfo() {
 
     void writeLedgerConfig(GenericCallback<LedgerMetadata> writeCb) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing metadata to ledger manager: {}, {}", this.ledgerId, metadata.getVersion());
+            LOG.debug("Writing metadata to ledger manager: {}, {}", this.ledgerId, getLedgerMetadata().getVersion());
         }
 
-        bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
+        bk.getLedgerManager().writeLedgerMetadata(ledgerId, getLedgerMetadata(), writeCb);
     }
 
     /**
@@ -476,7 +490,7 @@ public void asyncClose(CloseCallback cb, Object ctx) {
      */
     @Override
     public synchronized boolean isClosed() {
-        return metadata.isClosed();
+        return getLedgerMetadata().isClosed();
     }
 
     void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
@@ -536,6 +550,7 @@ public void safeRun() {
                 }
 
                 synchronized (LedgerHandle.this) {
+                    LedgerMetadata metadata = getLedgerMetadata();
                     prevState = metadata.getState();
                     prevLastEntryId = metadata.getLastEntryId();
                     prevLength = metadata.getLength();
@@ -556,6 +571,7 @@ public void safeRun() {
                 errorOutPendingAdds(rc, pendingAdds);
 
                 if (LOG.isDebugEnabled()) {
+                    LedgerMetadata metadata = getLedgerMetadata();
                     LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
                               + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
                 }
@@ -577,6 +593,7 @@ public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                                                 ledgerId, BKException.codeLogger(newrc));
                                         cb.closeComplete(rc, LedgerHandle.this, ctx);
                                     } else {
+                                        LedgerMetadata metadata = getLedgerMetadata();
                                         metadata.setState(prevState);
                                         if (prevState.equals(State.CLOSED)) {
                                             metadata.close(prevLastEntryId);
@@ -1120,7 +1137,7 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             }
         }
@@ -1198,7 +1215,7 @@ private boolean isWritesetWritable(DistributionSchedule.WriteSet writeSet,
 
         int nonWritableCount = 0;
         for (int i = 0; i < sz; i++) {
-            if (!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+            if (!bk.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), key)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1277,7 +1294,7 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             } else {
                 long entryId = ++lastAddPushed;
@@ -1358,6 +1375,7 @@ public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Obj
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lastEntryId = metadata.getLastEntryId();
         }
@@ -1399,6 +1417,7 @@ public void asyncTryReadLastConfirmed(final ReadLastConfirmedCallback cb, final
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lastEntryId = metadata.getLastEntryId();
         }
@@ -1486,6 +1505,7 @@ public void asyncReadLastConfirmedAndEntry(final long entryId,
         boolean isClosed;
         long lac;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lac = metadata.getLastEntryId();
         }
@@ -1653,6 +1673,7 @@ public long tryReadLastConfirmed() throws InterruptedException, BKException {
     public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) {
         boolean isClosed;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             if (isClosed) {
                 lastAddConfirmed = metadata.getLastEntryId();
@@ -1714,7 +1735,7 @@ public long readExplicitLastConfirmed() throws InterruptedException, BKException
 
     // close the ledger and send fails to all the adds in the pipeline
     void handleUnrecoverableErrorDuringAdd(int rc) {
-        if (metadata.isInRecovery()) {
+        if (getLedgerMetadata().isInRecovery()) {
             // we should not close ledger if ledger is recovery mode
             // otherwise we may lose entry.
             errorOutPendingAdds(rc);
@@ -1796,6 +1817,7 @@ EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> fai
         final ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>();
         final long newEnsembleStartEntry = getLastAddConfirmed() + 1;
         final HashSet<Integer> replacedBookies = new HashSet<Integer>();
+        final LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             newEnsemble.addAll(metadata.currentEnsemble);
             for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
@@ -1869,6 +1891,7 @@ void handleDelayedWriteBookieFailure() {
             }
             return;
         }
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = replaceBookieInMetadata(delayedWriteFailedBookies, curNumEnsembleChanges);
@@ -1922,6 +1945,7 @@ void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies)
             handleUnrecoverableErrorDuringAdd(WriteException);
             return;
         }
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
@@ -2075,7 +2099,7 @@ public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                     LOG.error("[EnsembleChange-L{}-{}] : could not resolve ledger metadata conflict"
                                     + " while changing ensemble to: {}, local meta data is \n {} \n,"
                                     + " zk meta data is \n {} \n, closing ledger",
-                            ledgerId, ensembleChangeIdx, ensembleInfo.newEnsemble, metadata, newMeta);
+                            ledgerId, ensembleChangeIdx, ensembleInfo.newEnsemble, getLedgerMetadata(), newMeta);
                     handleUnrecoverableErrorDuringAdd(rc);
                 }
             }
@@ -2096,6 +2120,7 @@ public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
          * </p>
          */
         private boolean resolveConflict(LedgerMetadata newMeta) {
+            LedgerMetadata metadata = getLedgerMetadata();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts - local metadata = \n {} \n,"
                     + " zk metadata = \n {} \n", ledgerId, ensembleChangeIdx, metadata, newMeta);
@@ -2125,7 +2150,7 @@ private boolean resolveConflict(LedgerMetadata newMeta) {
                 }
                 if (-1 == diff) {
                     // Case 1: metadata is changed by other ones (e.g. Recovery)
-                    return updateMetadataIfPossible(newMeta);
+                    return updateMetadataIfPossible(metadata, newMeta);
                 }
                 return false;
             }
@@ -2142,7 +2167,7 @@ private boolean resolveConflict(LedgerMetadata newMeta) {
                 // didn't finish, so try to resolve conflicts with the metadata read from zookeeper and
                 // update ensemble changed metadata again.
                 if (areFailedBookiesReplaced(metadata, ensembleInfo)) {
-                    return updateMetadataIfPossible(newMeta);
+                    return updateMetadataIfPossible(metadata, newMeta);
                 }
             } else {
                 ensembleChangeCounter.inc();
@@ -2177,7 +2202,7 @@ private boolean areFailedBookiesReplaced(LedgerMetadata newMeta, EnsembleInfo en
             return replaced;
         }
 
-        private boolean updateMetadataIfPossible(LedgerMetadata newMeta) {
+        private boolean updateMetadataIfPossible(LedgerMetadata metadata, LedgerMetadata newMeta) {
             // if the local metadata is newer than zookeeper metadata, it means that metadata is updated
             // again when it was trying re-reading the metatada, re-kick the reread again
             if (metadata.isNewerThan(newMeta)) {
@@ -2260,6 +2285,7 @@ void recover(GenericCallback<Void> finalCb,
         boolean wasClosed = false;
         boolean wasInRecovery = false;
 
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (this) {
             if (metadata.isClosed()) {
                 if (forceRecovery) {
@@ -2311,7 +2337,7 @@ public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
                             if (rc != BKException.Code.OK) {
                                 cb.operationComplete(rc, null);
                             } else {
-                                metadata = newMeta;
+                                LedgerHandle.this.metadata = newMeta;
                                 recover(cb, listener, forceRecovery);
                             }
                         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 70e94300ce..153ceebeca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -206,7 +206,7 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             } else {
                 long currentLength = addToLength(op.payload.readableBytes());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 3c3a1dd01f..5f1722634b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -138,8 +138,9 @@ long getEntryId() {
     void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
 
-        lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
-                entryId, toSend, this, bookieIndex, flags, allowFailFast, lh.writeFlags);
+        lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                         lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
+                                         flags, allowFailFast, lh.writeFlags);
         ++pendingWriteRequests;
     }
 
@@ -265,7 +266,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
         int bookieIndex = (Integer) ctx;
         --pendingWriteRequests;
 
-        if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
+        if (!lh.getLedgerMetadata().currentEnsemble.get(bookieIndex).equals(addr)) {
             // ensemble has already changed, failure of this addr is immaterial
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index db87d89406..7b3aa9fcd4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -60,13 +60,14 @@
     PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
         this.lh = lh;
         this.cb = cb;
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readLac(metadata.currentEnsemble.get(i),
                     lh.ledgerId, this, i);
         }
     }
@@ -117,7 +118,7 @@ public void readLacComplete(int rc, long ledgerId, final ByteBuf lacBuffer, fina
                 // Too bad, this bookie did not give us a valid answer, we
                 // still might be able to recover. So, continue
                 LOG.error("Mac mismatch while reading  ledger: " + ledgerId + " LAC from bookie: "
-                        + lh.metadata.currentEnsemble.get(bookieIndex));
+                        + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
                 rc = BKException.Code.DigestMatchException;
             }
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a6ffd32b06..76332dfa80 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -493,7 +493,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
     }
 
     protected LedgerMetadata getLedgerMetadata() {
-        return lh.metadata;
+        return lh.getLedgerMetadata();
     }
 
     protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index af45e296ba..d1631306a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -70,8 +70,8 @@ void setLac(long lac) {
     }
 
     void sendWriteLacRequest(int bookieIndex) {
-        lh.bk.getBookieClient().writeLac(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
-                lac, toSend, this, bookieIndex);
+        lh.bk.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                         lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
     }
 
     void initiate(ByteBufList toSend) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 2cb615273e..112a1452e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -52,13 +52,14 @@ public ReadLastConfirmedOp(LedgerHandle lh, LastConfirmedDataCallback cb) {
         this.cb = cb;
         this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
         this.lh = lh;
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i, BookieProtocol.FLAG_NONE);
@@ -66,8 +67,9 @@ public void initiate() {
     }
 
     public void initiateWithFencing() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                               lh.ledgerId,
                                               BookieProtocol.LAST_ADD_CONFIRMED,
                                               this, i, BookieProtocol.FLAG_DO_FENCING,
@@ -96,7 +98,7 @@ public synchronized void readEntryComplete(final int rc, final long ledgerId, fi
                 // still might be able to recover though so continue
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
                           + " while reading last entry from bookie: "
-                          + lh.metadata.currentEnsemble.get(bookieIndex));
+                          + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
             }
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index f2433edb91..3cc59a042b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -45,24 +45,30 @@
 
     class MetadataUpdater extends SafeRunnable {
 
-        final LedgerMetadata m;
+        final LedgerMetadata newMetadata;
 
         MetadataUpdater(LedgerMetadata metadata) {
-            this.m = metadata;
+            this.newMetadata = metadata;
         }
 
         @Override
         public void safeRun() {
-            Version.Occurred occurred =
-                    ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
-            if (Version.Occurred.BEFORE == occurred) {
-                LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m.toSafeString());
-                synchronized (ReadOnlyLedgerHandle.this) {
-                    if (this.m.isClosed()) {
-                            ReadOnlyLedgerHandle.this.lastAddConfirmed = this.m.getLastEntryId();
-                            ReadOnlyLedgerHandle.this.length = this.m.getLength();
+            while (true) {
+                LedgerMetadata currentMetadata = getLedgerMetadata();
+                Version.Occurred occurred = currentMetadata.getVersion().compare(newMetadata.getVersion());
+                if (Version.Occurred.BEFORE == occurred) {
+                    LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, newMetadata.toSafeString());
+                    synchronized (ReadOnlyLedgerHandle.this) {
+                        if (newMetadata.isClosed()) {
+                            ReadOnlyLedgerHandle.this.lastAddConfirmed = newMetadata.getLastEntryId();
+                            ReadOnlyLedgerHandle.this.length = newMetadata.getLength();
+                        }
+                        if (setLedgerMetadata(currentMetadata, newMetadata)) {
+                            break;
+                        }
                     }
-                    ReadOnlyLedgerHandle.this.metadata = this.m;
+                } else {
+                    break;
                 }
             }
         }
@@ -123,7 +129,7 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length,
     @Override
     void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
         blockAddCompletions.incrementAndGet();
-        synchronized (metadata) {
+        synchronized (getLedgerMetadata()) {
             try {
                 EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies,
                         numEnsembleChanges.incrementAndGet());
@@ -154,11 +160,11 @@ public void onChanged(long lid, LedgerMetadata newMetadata) {
         if (null == newMetadata) {
             return;
         }
-        Version.Occurred occurred =
-                this.metadata.getVersion().compare(newMetadata.getVersion());
+        LedgerMetadata currentMetadata = getLedgerMetadata();
+        Version.Occurred occurred = currentMetadata.getVersion().compare(newMetadata.getVersion());
         if (LOG.isDebugEnabled()) {
             LOG.debug("Try to update metadata from {} to {} : {}",
-                    this.metadata, newMetadata, occurred);
+                      currentMetadata, newMetadata, occurred);
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 441504c60c..b8a5f0d074 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -46,12 +46,13 @@
         this.lh = lh;
         this.cb = cb;
         this.maxRecoveredData = new RecoveryData(lac, 0);
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i, BookieProtocol.FLAG_NONE);
@@ -83,7 +84,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
             } catch (BKException.BKDigestMatchException e) {
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
                           + " while reading last entry from bookie: "
-                          + lh.metadata.currentEnsemble.get(bookieIndex));
+                          + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
             }
         } else if (BKException.Code.UnauthorizedAccessException == rc && !completed) {
             cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 8e2dd7dd4f..bb3e5532f7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -46,7 +46,7 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo
      * Returns that whether ledger is in open state.
      */
     public static boolean isLedgerOpen(LedgerHandle handle) {
-        return !handle.metadata.isClosed();
+        return !handle.getLedgerMetadata().isClosed();
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index f31feefd7d..b87c6661da 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -428,14 +428,16 @@ public void testRecoveryOnEntryGap() throws Exception {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(0), lh.getId(), lh.ledgerKey, entryId, toSend,
-            new WriteCallback() {
-                @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
-                    addSuccess.set(BKException.Code.OK == rc);
-                    addLatch.countDown();
-                }
-            }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+        lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
+                                         lh.getId(), lh.ledgerKey, entryId, toSend,
+                                         new WriteCallback() {
+                                             @Override
+                                             public void writeComplete(int rc, long ledgerId, long entryId,
+                                                                       BookieSocketAddress addr, Object ctx) {
+                                                 addSuccess.set(BKException.Code.OK == rc);
+                                                 addLatch.countDown();
+                                             }
+                                         }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         addLatch.await();
         assertTrue("add entry 14 should succeed", addSuccess.get());
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 6e638105f6..49a435ec4e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -54,7 +54,7 @@ public void testPendingReadLacOpMissingExplicitLAC() throws Exception {
         PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
                     final int index = i;
                     ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending(
                             2,
@@ -70,7 +70,7 @@ public void initiate() {
                                 index);
 
                     }, 0, TimeUnit.SECONDS);
-                    lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                    lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }
@@ -90,7 +90,7 @@ public void testPendingReadLacOpMissingLAC() throws Exception {
         PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
                     final int index = i;
                     ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
                     bkc.scheduler.schedule(() -> {
@@ -101,7 +101,7 @@ public void initiate() {
                                 null,
                                 index);
                     }, 0, TimeUnit.SECONDS);
-                    lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                    lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services