You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by kr...@apache.org on 2019/06/04 17:44:07 UTC
[ambari] branch branch-2.7 updated: AMBARI-25293 - Logsearch:
logfeeder throws NPE when updating checkpoint (#2998)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new e7602a0 AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998)
e7602a0 is described below
commit e7602a087ef4ca759f6aed7b8f37e5ef3a20dcd5
Author: kasakrisz <33...@users.noreply.github.com>
AuthorDate: Tue Jun 4 19:43:58 2019 +0200
AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998)
---
.../logfeeder/input/file/FileCheckInHelper.java | 65 ++++++++++++++++++----
.../input/file/ResumeLineNumberHelper.java | 22 +++-----
2 files changed, 61 insertions(+), 26 deletions(-)
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
index 7b8f0cd..f96c0d2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -18,18 +18,18 @@
*/
package org.apache.ambari.logfeeder.input.file;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.Map;
-
public class FileCheckInHelper {
private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class);
@@ -40,7 +40,14 @@ public class FileCheckInHelper {
public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
try {
Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey());
+ if (jsonCheckPoint == null) {
+ jsonCheckPoint = createNewCheckpointObject(inputFile);
+ attachCheckpointToInput(inputFile, jsonCheckPoint);
+ }
File checkPointFile = inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey());
+ if (checkPointFile == null || !checkPointFile.exists()) {
+ checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile);
+ }
int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
if (lineNumber > inputMarker.getLineNumber()) {
@@ -80,15 +87,49 @@ public class FileCheckInHelper {
FileUtil.move(tmpCheckPointFile, checkPointFile);
if (inputFile.isClosed()) {
- String logMessageKey = inputFile.getClass().getSimpleName() + "_FINAL_CHECKIN";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + inputFile.getShortDescription() +
- ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+ LOG.info(String.format("Wrote final checkPoint, input=%s, checkPointFile=%s, checkPoint=%s", inputFile.getShortDescription(), checkPointFile.getAbsolutePath(), jsonStr));
}
} catch (Throwable t) {
- String logMessageKey = inputFile.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + inputFile.getShortDescription(), t,
- LOG, Level.ERROR);
+ LOG.error("Caught exception checkIn. , input=" + inputFile.getShortDescription(), t);
+ }
+ }
+
+ /**
+ * Create new checkpoint object
+ * @param inputFile file object which is used to fill the checkpoint defaults
+ * @return Created checkpoint object
+ */
+ static Map<String, Object> createNewCheckpointObject(final InputFile inputFile) {
+ Map<String, Object> jsonCheckPoint = new HashMap<>();
+ jsonCheckPoint.put("file_path", inputFile.getFilePath());
+ try {
+ jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+ } catch (Exception e) {
+ LOG.error(String.format("Error during checkpoint object (path: %s) creationg: %s", inputFile.getFilePath(), e.getMessage()));
}
+ return jsonCheckPoint;
+ }
+
+ /**
+ * Attach a json checkpoint object to an input file
+ * @param inputFile input file object that will have the new checkpoint
+ * @param jsonCheckPoint holds checkpoint related data
+ */
+ static void attachCheckpointToInput(final InputFile inputFile, final Map<String, Object> jsonCheckPoint) throws Exception {
+ inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+ }
+
+ /**
+ * Create a new file object for input checkpoint
+ * @param inputFile input file object that will have the new checkpoint file
+ * @return Newly created checkpoint file
+ */
+ static File attachCheckpointFileToInput(final InputFile inputFile) throws Exception {
+ String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
+ File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
+ File checkPointFile = new File(checkPointFolder, checkPointFileName);
+ inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+ return checkPointFile;
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
index 9350200..614c3bc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
@@ -18,17 +18,16 @@
*/
package org.apache.ambari.logfeeder.input.file;
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.EOFException;
import java.io.File;
import java.io.RandomAccessFile;
-import java.util.HashMap;
import java.util.Map;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ResumeLineNumberHelper {
private static final Logger LOG = LoggerFactory.getLogger(ResumeLineNumberHelper.class);
@@ -43,10 +42,7 @@ public class ResumeLineNumberHelper {
try {
LOG.info("Checking existing checkpoint file. " + inputFile.getShortDescription());
- String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
- File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder, checkPointFileName);
- inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+ checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile);
Map<String, Object> jsonCheckPoint = null;
if (!checkPointFile.exists()) {
LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + " doesn't exist, starting to read it from the beginning");
@@ -74,12 +70,10 @@ public class ResumeLineNumberHelper {
}
if (jsonCheckPoint == null) {
// This seems to be first time, so creating the initial checkPoint object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", inputFile.getFilePath());
- jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+ FileCheckInHelper.createNewCheckpointObject(inputFile);
}
- inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+ FileCheckInHelper.attachCheckpointToInput(inputFile, jsonCheckPoint);
} catch (Throwable t) {
LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);