You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by kr...@apache.org on 2019/06/04 17:44:07 UTC

[ambari] branch branch-2.7 updated: AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new e7602a0  AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998)
e7602a0 is described below

commit e7602a087ef4ca759f6aed7b8f37e5ef3a20dcd5
Author: kasakrisz <33...@users.noreply.github.com>
AuthorDate: Tue Jun 4 19:43:58 2019 +0200

    AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998)
---
 .../logfeeder/input/file/FileCheckInHelper.java    | 65 ++++++++++++++++++----
 .../input/file/ResumeLineNumberHelper.java         | 22 +++-----
 2 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
index 7b8f0cd..f96c0d2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -18,18 +18,18 @@
  */
 package org.apache.ambari.logfeeder.input.file;
 
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.ambari.logfeeder.input.InputFile;
 import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.Map;
-
 public class FileCheckInHelper {
 
   private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class);
@@ -40,7 +40,14 @@ public class FileCheckInHelper {
   public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
     try {
       Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey());
+      if (jsonCheckPoint == null) {
+        jsonCheckPoint = createNewCheckpointObject(inputFile);
+        attachCheckpointToInput(inputFile, jsonCheckPoint);
+      }
       File checkPointFile = inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey());
+      if (checkPointFile == null || !checkPointFile.exists()) {
+        checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile);
+      }
 
       int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
       if (lineNumber > inputMarker.getLineNumber()) {
@@ -80,15 +87,49 @@ public class FileCheckInHelper {
       FileUtil.move(tmpCheckPointFile, checkPointFile);
 
       if (inputFile.isClosed()) {
-        String logMessageKey = inputFile.getClass().getSimpleName() + "_FINAL_CHECKIN";
-        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + inputFile.getShortDescription() +
-          ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+        LOG.info(String.format("Wrote final checkPoint, input=%s, checkPointFile=%s, checkPoint=%s", inputFile.getShortDescription(), checkPointFile.getAbsolutePath(), jsonStr));
       }
     } catch (Throwable t) {
-      String logMessageKey = inputFile.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + inputFile.getShortDescription(), t,
-        LOG, Level.ERROR);
+      LOG.error("Caught exception checkIn. , input=" + inputFile.getShortDescription(), t);
+    }
+  }
+
+  /**
+   * Create new checkpoint object
+   * @param inputFile file object which is used to fill the checkpoint defaults
+   * @return Created checkpoint object
+   */
+  static Map<String, Object> createNewCheckpointObject(final InputFile inputFile) {
+    Map<String, Object> jsonCheckPoint = new HashMap<>();
+    jsonCheckPoint.put("file_path", inputFile.getFilePath());
+    try {
+      jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+    } catch (Exception e) {
+      LOG.error(String.format("Error during checkpoint object (path: %s) creationg: %s", inputFile.getFilePath(), e.getMessage()));
     }
+    return jsonCheckPoint;
+  }
+
+  /**
+   * Attach a json checkpoint object to an input file
+   * @param inputFile input file object that will have the new checkpoint
+   * @param jsonCheckPoint holds checkpoint related data
+   */
+  static void attachCheckpointToInput(final InputFile inputFile, final Map<String, Object> jsonCheckPoint) throws Exception {
+    inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+  }
+
+  /**
+   * Create a new file object for input checkpoint
+   * @param inputFile input file object that will have the new checkpoint file
+   * @return Newly created checkpoint file
+   */
+  static File attachCheckpointFileToInput(final InputFile inputFile) throws Exception {
+    String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
+    File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
+    File checkPointFile = new File(checkPointFolder, checkPointFileName);
+    inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+    return checkPointFile;
   }
 
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
index 9350200..614c3bc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
@@ -18,17 +18,16 @@
  */
 package org.apache.ambari.logfeeder.input.file;
 
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.EOFException;
 import java.io.File;
 import java.io.RandomAccessFile;
-import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class ResumeLineNumberHelper {
 
   private static final Logger LOG = LoggerFactory.getLogger(ResumeLineNumberHelper.class);
@@ -43,10 +42,7 @@ public class ResumeLineNumberHelper {
     try {
       LOG.info("Checking existing checkpoint file. " + inputFile.getShortDescription());
 
-      String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
-      File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
-      checkPointFile = new File(checkPointFolder, checkPointFileName);
-      inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+      checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile);
       Map<String, Object> jsonCheckPoint = null;
       if (!checkPointFile.exists()) {
         LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + " doesn't exist, starting to read it from the beginning");
@@ -74,12 +70,10 @@ public class ResumeLineNumberHelper {
       }
       if (jsonCheckPoint == null) {
         // This seems to be first time, so creating the initial checkPoint object
-        jsonCheckPoint = new HashMap<String, Object>();
-        jsonCheckPoint.put("file_path", inputFile.getFilePath());
-        jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+        FileCheckInHelper.createNewCheckpointObject(inputFile);
       }
 
-      inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+      FileCheckInHelper.attachCheckpointToInput(inputFile, jsonCheckPoint);
 
     } catch (Throwable t) {
       LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);