You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/14 17:12:38 UTC

hadoop git commit: HDFS-13544. Improve logging for JournalNode in federated cluster.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6653f4ba2 -> 6beb25ab7


HDFS-13544. Improve logging for JournalNode in federated cluster.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6beb25ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6beb25ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6beb25ab

Branch: refs/heads/trunk
Commit: 6beb25ab7e4f5454dba0315a296081e61753f301
Parents: 6653f4b
Author: Hanisha Koneru <ha...@apache.org>
Authored: Mon May 14 10:12:08 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Mon May 14 10:12:08 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/qjournal/server/Journal.java    | 115 +++++++++++--------
 1 file changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6beb25ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 408ce76..452664a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -208,11 +208,12 @@ public class Journal implements Closeable {
     while (!files.isEmpty()) {
       EditLogFile latestLog = files.remove(files.size() - 1);
       latestLog.scanLog(Long.MAX_VALUE, false);
-      LOG.info("Latest log is " + latestLog);
+      LOG.info("Latest log is " + latestLog + " ; journal id: " + journalId);
       if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
         // the log contains no transactions
         LOG.warn("Latest log " + latestLog + " has no transactions. " +
-            "moving it aside and looking for previous log");
+            "moving it aside and looking for previous log"
+            + " ; journal id: " + journalId);
         latestLog.moveAsideEmptyFile();
       } else {
         return latestLog;
@@ -230,7 +231,7 @@ public class Journal implements Closeable {
     Preconditions.checkState(nsInfo.getNamespaceID() != 0,
         "can't format with uninitialized namespace info: %s",
         nsInfo);
-    LOG.info("Formatting " + this + " with namespace info: " +
+    LOG.info("Formatting journal id : " + journalId + " with namespace info: " +
         nsInfo);
     storage.format(nsInfo);
     refreshCachedData();
@@ -323,7 +324,7 @@ public class Journal implements Closeable {
     // any other that we've promised. 
     if (epoch <= getLastPromisedEpoch()) {
       throw new IOException("Proposed epoch " + epoch + " <= last promise " +
-          getLastPromisedEpoch());
+          getLastPromisedEpoch() + " ; journal id: " + journalId);
     }
     
     updateLastPromisedEpoch(epoch);
@@ -343,7 +344,8 @@ public class Journal implements Closeable {
 
   private void updateLastPromisedEpoch(long newEpoch) throws IOException {
     LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
-        " to " + newEpoch + " for client " + Server.getRemoteIp());
+        " to " + newEpoch + " for client " + Server.getRemoteIp() +
+        " ; journal id: " + journalId);
     lastPromisedEpoch.set(newEpoch);
     
     // Since we have a new writer, reset the IPC serial - it will start
@@ -378,7 +380,7 @@ public class Journal implements Closeable {
     }
 
     checkSync(curSegment != null,
-        "Can't write, no segment open");
+        "Can't write, no segment open" + " ; journal id: " + journalId);
 
     if (curSegmentTxId != segmentTxId) {
       // Sanity check: it is possible that the writer will fail IPCs
@@ -389,17 +391,20 @@ public class Journal implements Closeable {
       // and throw an exception.
       JournalOutOfSyncException e = new JournalOutOfSyncException(
           "Writer out of sync: it thinks it is writing segment " + segmentTxId
-          + " but current segment is " + curSegmentTxId);
+              + " but current segment is " + curSegmentTxId
+              + " ; journal id: " + journalId);
       abortCurSegment();
       throw e;
     }
       
     checkSync(nextTxId == firstTxnId,
-        "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
+        "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId
+            + " ; journal id: " + journalId);
     
     long lastTxnId = firstTxnId + numTxns - 1;
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
+      LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
+          " ; journal id: " + journalId);
     }
 
     // If the edit has already been marked as committed, we know
@@ -423,7 +428,7 @@ public class Journal implements Closeable {
 
     if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
       LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
-               " took " + milliSeconds + "ms");
+               " took " + milliSeconds + "ms" + " ; journal id: " + journalId);
     }
 
     if (isLagging) {
@@ -455,7 +460,7 @@ public class Journal implements Closeable {
     if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
       throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
           " is less than the last promised epoch " +
-          lastPromisedEpoch.get());
+          lastPromisedEpoch.get() + " ; journal id: " + journalId);
     } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
       // A newer client has arrived. Fence any previous writers by updating
       // the promise.
@@ -465,16 +470,16 @@ public class Journal implements Closeable {
     // Ensure that the IPCs are arriving in-order as expected.
     checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
         "IPC serial %s from client %s was not higher than prior highest " +
-        "IPC serial %s", reqInfo.getIpcSerialNumber(),
-        Server.getRemoteIp(),
-        currentEpochIpcSerial);
+        "IPC serial %s ; journal id: %s", reqInfo.getIpcSerialNumber(),
+        Server.getRemoteIp(), currentEpochIpcSerial, journalId);
     currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
 
     if (reqInfo.hasCommittedTxId()) {
       Preconditions.checkArgument(
           reqInfo.getCommittedTxId() >= committedTxnId.get(),
           "Client trying to move committed txid backward from " +
-          committedTxnId.get() + " to " + reqInfo.getCommittedTxId());
+          committedTxnId.get() + " to " + reqInfo.getCommittedTxId() +
+              " ; journal id: " + journalId);
       
       committedTxnId.set(reqInfo.getCommittedTxId());
     }
@@ -486,7 +491,7 @@ public class Journal implements Closeable {
     if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
       throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
           " is not the current writer epoch  " +
-          lastWriterEpoch.get());
+          lastWriterEpoch.get() + " ; journal id: " + journalId);
     }
   }
   
@@ -497,7 +502,8 @@ public class Journal implements Closeable {
   private void checkFormatted() throws JournalNotFormattedException {
     if (!isFormatted()) {
       throw new JournalNotFormattedException("Journal " +
-          storage.getSingularStorageDir() + " not formatted");
+          storage.getSingularStorageDir() + " not formatted" +
+          " ; journal id: " + journalId);
     }
   }
 
@@ -542,7 +548,8 @@ public class Journal implements Closeable {
     if (curSegment != null) {
       LOG.warn("Client is requesting a new log segment " + txid + 
           " though we are already writing " + curSegment + ". " +
-          "Aborting the current segment in order to begin the new one.");
+          "Aborting the current segment in order to begin the new one." +
+          " ; journal id: " + journalId);
       // The writer may have lost a connection to us and is now
       // re-connecting after the connection came back.
       // We should abort our own old segment.
@@ -556,7 +563,7 @@ public class Journal implements Closeable {
     if (existing != null) {
       if (!existing.isInProgress()) {
         throw new IllegalStateException("Already have a finalized segment " +
-            existing + " beginning at " + txid);
+            existing + " beginning at " + txid + " ; journal id: " + journalId);
       }
       
       // If it's in-progress, it should only contain one transaction,
@@ -565,7 +572,8 @@ public class Journal implements Closeable {
       existing.scanLog(Long.MAX_VALUE, false);
       if (existing.getLastTxId() != existing.getFirstTxId()) {
         throw new IllegalStateException("The log file " +
-            existing + " seems to contain valid transactions");
+            existing + " seems to contain valid transactions" +
+            " ; journal id: " + journalId);
       }
     }
     
@@ -573,7 +581,7 @@ public class Journal implements Closeable {
     if (curLastWriterEpoch != reqInfo.getEpoch()) {
       LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
           " to " + reqInfo.getEpoch() + " for client " +
-          Server.getRemoteIp());
+          Server.getRemoteIp() + " ; journal id: " + journalId);
       lastWriterEpoch.set(reqInfo.getEpoch());
     }
 
@@ -608,8 +616,8 @@ public class Journal implements Closeable {
       
       checkSync(nextTxId == endTxId + 1,
           "Trying to finalize in-progress log segment %s to end at " +
-          "txid %s but only written up to txid %s",
-          startTxId, endTxId, nextTxId - 1);
+          "txid %s but only written up to txid %s ; journal id: %s",
+          startTxId, endTxId, nextTxId - 1, journalId);
       // No need to validate the edit log if the client is finalizing
       // the log segment that it was just writing to.
       needsValidation = false;
@@ -618,25 +626,27 @@ public class Journal implements Closeable {
     FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
     if (elf == null) {
       throw new JournalOutOfSyncException("No log file to finalize at " +
-          "transaction ID " + startTxId);
+          "transaction ID " + startTxId + " ; journal id: " + journalId);
     }
 
     if (elf.isInProgress()) {
       if (needsValidation) {
         LOG.info("Validating log segment " + elf.getFile() + " about to be " +
-            "finalized");
+            "finalized ; journal id: " + journalId);
         elf.scanLog(Long.MAX_VALUE, false);
   
         checkSync(elf.getLastTxId() == endTxId,
             "Trying to finalize in-progress log segment %s to end at " +
-            "txid %s but log %s on disk only contains up to txid %s",
-            startTxId, endTxId, elf.getFile(), elf.getLastTxId());
+            "txid %s but log %s on disk only contains up to txid %s " +
+            "; journal id: %s",
+            startTxId, endTxId, elf.getFile(), elf.getLastTxId(), journalId);
       }
       fjm.finalizeLogSegment(startTxId, endTxId);
     } else {
       Preconditions.checkArgument(endTxId == elf.getLastTxId(),
           "Trying to re-finalize already finalized log " +
-              elf + " with different endTxId " + endTxId);
+              elf + " with different endTxId " + endTxId +
+              " ; journal id: " + journalId);
     }
 
     // Once logs are finalized, a different length will never be decided.
@@ -667,7 +677,8 @@ public class Journal implements Closeable {
     File paxosFile = storage.getPaxosFile(segmentTxId);
     if (paxosFile.exists()) {
       if (!paxosFile.delete()) {
-        throw new IOException("Unable to delete paxos file " + paxosFile);
+        throw new IOException("Unable to delete paxos file " + paxosFile +
+            " ; journal id: " + journalId);
       }
     }
   }
@@ -717,7 +728,7 @@ public class Journal implements Closeable {
     }
     if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
       LOG.info("Edit log file " + elf + " appears to be empty. " +
-          "Moving it aside...");
+          "Moving it aside..." + " ; journal id: " + journalId);
       elf.moveAsideEmptyFile();
       return null;
     }
@@ -727,7 +738,7 @@ public class Journal implements Closeable {
         .setIsInProgress(elf.isInProgress())
         .build();
     LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
-        TextFormat.shortDebugString(ret));
+        TextFormat.shortDebugString(ret) + " ; journal id: " + journalId);
     return ret;
   }
 
@@ -771,7 +782,7 @@ public class Journal implements Closeable {
     
     PrepareRecoveryResponseProto resp = builder.build();
     LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
-        TextFormat.shortDebugString(resp));
+        TextFormat.shortDebugString(resp) + " ; journal id: " + journalId);
     return resp;
   }
   
@@ -792,8 +803,8 @@ public class Journal implements Closeable {
     // at least one transaction.
     Preconditions.checkArgument(segment.getEndTxId() > 0 &&
         segment.getEndTxId() >= segmentTxId,
-        "bad recovery state for segment %s: %s",
-        segmentTxId, TextFormat.shortDebugString(segment));
+        "bad recovery state for segment %s: %s ; journal id: %s",
+        segmentTxId, TextFormat.shortDebugString(segment), journalId);
     
     PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
     PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
@@ -806,8 +817,9 @@ public class Journal implements Closeable {
     // checkRequest() call above should filter non-increasing epoch numbers.
     if (oldData != null) {
       alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
-          "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
-          oldData, newData);
+          "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: " +
+              "%s\nJournalId: %s\n",
+          oldData, newData, journalId);
     }
     
     File syncedFile = null;
@@ -817,7 +829,7 @@ public class Journal implements Closeable {
         currentSegment.getEndTxId() != segment.getEndTxId()) {
       if (currentSegment == null) {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
-            ": no current segment in place");
+            ": no current segment in place ; journal id: " + journalId);
         
         // Update the highest txid for lag metrics
         updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
@@ -825,7 +837,7 @@ public class Journal implements Closeable {
       } else {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
             ": old segment " + TextFormat.shortDebugString(currentSegment) +
-            " is not the right length");
+            " is not the right length ; journal id: " + journalId);
         
         // Paranoid sanity check: if the new log is shorter than the log we
         // currently have, we should not end up discarding any transactions
@@ -838,14 +850,15 @@ public class Journal implements Closeable {
               " with new segment " +
               TextFormat.shortDebugString(segment) + 
               ": would discard already-committed txn " +
-              committedTxnId.get());
+              committedTxnId.get() +
+              " ; journal id: " + journalId);
         }
         
         // Another paranoid check: we should not be asked to synchronize a log
         // on top of a finalized segment.
         alwaysAssert(currentSegment.getIsInProgress(),
-            "Should never be asked to synchronize a different log on top of an " +
-            "already-finalized segment");
+            "Should never be asked to synchronize a different log on top of " +
+            "an already-finalized segment ; journal id: " + journalId);
         
         // If we're shortening the log, update our highest txid
         // used for lag metrics.
@@ -858,7 +871,7 @@ public class Journal implements Closeable {
     } else {
       LOG.info("Skipping download of log " +
           TextFormat.shortDebugString(segment) +
-          ": already have up-to-date logs");
+          ": already have up-to-date logs ; journal id: " + journalId);
     }
     
     // This is one of the few places in the protocol where we have a single
@@ -890,12 +903,12 @@ public class Journal implements Closeable {
     }
 
     LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
-        TextFormat.shortDebugString(newData));
+        TextFormat.shortDebugString(newData) + " ; journal id: " + journalId);
   }
 
   private LongRange txnRange(SegmentStateProto seg) {
     Preconditions.checkArgument(seg.hasEndTxId(),
-        "invalid segment: %s", seg);
+        "invalid segment: %s ; journal id: %s", seg, journalId);
     return new LongRange(seg.getStartTxId(), seg.getEndTxId());
   }
 
@@ -970,7 +983,7 @@ public class Journal implements Closeable {
     if (tmp.exists()) {
       File dst = storage.getInProgressEditLog(segmentId);
       LOG.info("Rolling forward previously half-completed synchronization: " +
-          tmp + " -> " + dst);
+          tmp + " -> " + dst + " ; journal id: " + journalId);
       FileUtil.replaceFile(tmp, dst);
     }
   }
@@ -991,8 +1004,8 @@ public class Journal implements Closeable {
       PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
       Preconditions.checkState(ret != null &&
           ret.getSegmentState().getStartTxId() == segmentTxId,
-          "Bad persisted data for segment %s: %s",
-          segmentTxId, ret);
+          "Bad persisted data for segment %s: %s ; journal id: %s",
+          segmentTxId, ret, journalId);
       return ret;
     } finally {
       IOUtils.closeStream(in);
@@ -1041,7 +1054,7 @@ public class Journal implements Closeable {
     storage.cTime = sInfo.cTime;
     int oldLV = storage.getLayoutVersion();
     storage.layoutVersion = sInfo.layoutVersion;
-    LOG.info("Starting upgrade of edits directory: "
+    LOG.info("Starting upgrade of edits directory: " + storage.getRoot()
         + ".\n   old LV = " + oldLV
         + "; old CTime = " + oldCTime
         + ".\n   new LV = " + storage.getLayoutVersion()
@@ -1112,7 +1125,7 @@ public class Journal implements Closeable {
     if (endTxId <= committedTxnId.get()) {
       if (!finalFile.getParentFile().exists()) {
         LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
-            "segment move to current directory");
+            "segment move to current directory ; journal id: " + journalId);
         return false;
       }
       Files.move(tmpFile.toPath(), finalFile.toPath(),
@@ -1122,13 +1135,13 @@ public class Journal implements Closeable {
       } else {
         success = false;
         LOG.warn("Unable to move edits file from " + tmpFile + " to " +
-            finalFile);
+            finalFile + " ; journal id: " + journalId);
       }
     } else {
       success = false;
       LOG.error("The endTxId of the temporary file is not less than the " +
           "last committed transaction id. Aborting move to final file" +
-          finalFile);
+          finalFile + " ; journal id: " + journalId);
     }
 
     return success;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org