You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pw...@apache.org on 2021/08/05 07:32:22 UTC

[hudi] 03/03: [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)

This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8f8cd42e9792e0a9648b68f933929ad87cd503c5
Author: Mathieu <49...@users.noreply.github.com>
AuthorDate: Mon Jan 13 11:18:09 2020 +0800

    [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)
---
 hudi-hadoop-mr/pom.xml                             |  7 ++++++
 .../org/apache/hudi/hadoop/HoodieHiveUtil.java     | 12 +++++-----
 .../hudi/hadoop/HoodieParquetInputFormat.java      | 26 ++++++++++----------
 .../hudi/hadoop/HoodieROTablePathFilter.java       | 28 +++++++++++-----------
 .../hudi/hadoop/RecordReaderValueIterator.java     |  6 ++---
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  | 27 ++++++++++-----------
 .../realtime/AbstractRealtimeRecordReader.java     | 24 +++++++++----------
 .../realtime/HoodieParquetRealtimeInputFormat.java | 22 ++++++++---------
 .../realtime/HoodieRealtimeRecordReader.java       |  6 ++---
 .../realtime/RealtimeCompactedRecordReader.java    | 14 +++++------
 10 files changed, 89 insertions(+), 83 deletions(-)

diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 6bc3c8e..2a222f3 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -81,6 +81,13 @@
       <artifactId>hive-exec</artifactId>
     </dependency>
 
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
     <!-- Hoodie - Test -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
index 1db8c54..f371719 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
@@ -20,12 +20,12 @@ package org.apache.hudi.hadoop;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HoodieHiveUtil {
 
-  public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
+  public static final Logger LOG = LoggerFactory.getLogger(HoodieHiveUtil.class);
 
   public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
   public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
@@ -43,20 +43,20 @@ public class HoodieHiveUtil {
     if (maxCommits == MAX_COMMIT_ALL) {
       maxCommits = Integer.MAX_VALUE;
     }
-    LOG.info("Read max commits - " + maxCommits);
+    LOG.info("Read max commits - {}", maxCommits);
     return maxCommits;
   }
 
   public static String readStartCommitTime(JobContext job, String tableName) {
     String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
-    LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
+    LOG.info("Read start commit time - {}", job.getConfiguration().get(startCommitTimestampName));
     return job.getConfiguration().get(startCommitTimestampName);
   }
 
   public static String readMode(JobContext job, String tableName) {
     String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
     String mode = job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
-    LOG.info(modePropertyName + ": " + mode);
+    LOG.info("Hoodie consume mode pattern is : {}, mode is : {}", modePropertyName, mode);
     return mode;
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index e8f7de0..ea92f11 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -60,7 +60,7 @@ import java.util.stream.Collectors;
 @UseFileSplitsFromInputFormat
 public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetInputFormat.class);
 
   protected Configuration conf;
 
@@ -69,7 +69,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     // Get all the file status from FileInputFormat and then do the filter
     FileStatus[] fileStatuses = super.listStatus(job);
     Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
-    LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+    LOG.info("Found a total of {} groups", groupedFileStatus.size());
     List<FileStatus> returns = new ArrayList<>();
     for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
       HoodieTableMetaClient metadata = entry.getKey();
@@ -81,7 +81,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
 
       FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+        LOG.debug("Hoodie Metadata initialized with completed commit Ts as : {}", metadata);
       }
       String tableName = metadata.getTableConfig().getTableName();
       String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
@@ -95,24 +95,24 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
         String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
         // Total number of commits to return in this batch. Set this to -1 to get all the commits.
         Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
-        LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+        LOG.info("Last Incremental timestamp was set as {}", lastIncrementalTs);
         List<String> commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
             .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
         List<HoodieDataFile> filteredFiles =
             roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList());
         for (HoodieDataFile filteredFile : filteredFiles) {
-          LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
+          LOG.info("Processing incremental hoodie file - {}", filteredFile.getPath());
           filteredFile = checkFileStatus(filteredFile);
           returns.add(filteredFile.getFileStatus());
         }
-        LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
+        LOG.info("Total paths to process after hoodie incremental filter {}", filteredFiles.size());
       } else {
         // filter files on the latest commit found
         List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
-        LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+        LOG.info("Total paths to process after hoodie filter {}", filteredFiles.size());
         for (HoodieDataFile filteredFile : filteredFiles) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+            LOG.debug("Processing latest hoodie file - {}", filteredFile.getPath());
           }
           filteredFile = checkFileStatus(filteredFile);
           returns.add(filteredFile.getFileStatus());
@@ -133,7 +133,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     try {
       if (dataFile.getFileSize() == 0) {
         FileSystem fs = dataPath.getFileSystem(conf);
-        LOG.info("Refreshing file status " + dataFile.getPath());
+        LOG.info("Refreshing file status {}", dataFile.getPath());
         return new HoodieDataFile(fs.getFileStatus(dataPath));
       }
       return dataFile;
@@ -160,7 +160,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
           metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
           nonHoodieBasePath = null;
         } catch (DatasetNotFoundException | InvalidDatasetException e) {
-          LOG.info("Handling a non-hoodie path " + status.getPath());
+          LOG.info("Handling a non-hoodie path {}", status.getPath());
           metadata = null;
           nonHoodieBasePath = status.getPath().getParent().toString();
         }
@@ -213,7 +213,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       levels = metadata.getPartitionDepth();
     }
     Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
-    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+    LOG.info("Reading hoodie metadata from path {}", baseDir);
     return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index fae8111..4670e4f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieROTablePathFilter implements PathFilter, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieROTablePathFilter.class);
 
   /**
    * Its quite common, to have all files from a given partition path be passed into accept(), cache the check for hoodie
@@ -88,7 +88,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
   public boolean accept(Path path) {
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking acceptance for path " + path);
+      LOG.debug("Checking acceptance for path {}", path);
     }
     Path folder = null;
     try {
@@ -101,15 +101,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       // Try to use the caches.
       if (nonHoodiePathCache.contains(folder.toString())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Accepting non-hoodie path from cache: " + path);
+          LOG.debug("Accepting non-hoodie path from cache: {}", path);
         }
         return true;
       }
 
       if (hoodiePathCache.containsKey(folder.toString())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("%s Hoodie path checked against cache, accept => %s \n", path,
-              hoodiePathCache.get(folder.toString()).contains(path)));
+          LOG.debug("{} Hoodie path checked against cache, accept => {}", path,
+              hoodiePathCache.get(folder.toString()).contains(path));
         }
         return hoodiePathCache.get(folder.toString()).contains(path);
       }
@@ -119,7 +119,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
           || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Skipping Hoodie Metadata file  %s \n", filePath));
+          LOG.debug("Skipping Hoodie Metadata file {}", filePath);
         }
         return false;
       }
@@ -144,22 +144,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
           if (!hoodiePathCache.containsKey(folder.toString())) {
             hoodiePathCache.put(folder.toString(), new HashSet<>());
           }
-          LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + ", caching " + latestFiles.size()
-              + " files under " + folder);
+          LOG.info("Based on hoodie metadata from base path: {}, caching {} files under {}", baseDir,
+              latestFiles.size(), folder);
           for (HoodieDataFile lfile : latestFiles) {
             hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath()));
           }
 
           // accept the path, if its among the latest files.
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("%s checked after cache population, accept => %s \n", path,
-                hoodiePathCache.get(folder.toString()).contains(path)));
+            LOG.debug("{} checked after cache population, accept => {}", path,
+                hoodiePathCache.get(folder.toString()).contains(path));
           }
           return hoodiePathCache.get(folder.toString()).contains(path);
         } catch (DatasetNotFoundException e) {
           // Non-hoodie path, accept it.
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", folder.toString()));
+            LOG.debug("(1) Caching non-hoodie path under {}", folder);
           }
           nonHoodiePathCache.add(folder.toString());
           return true;
@@ -167,7 +167,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       } else {
         // files is at < 3 level depth in FS tree, can't be hoodie dataset
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("(2) Caching non-hoodie path under %s \n", folder.toString()));
+          LOG.debug("(2) Caching non-hoodie path under {}", folder);
         }
         nonHoodiePathCache.add(folder.toString());
         return true;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
index 0386186..7ffa3bf 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
@@ -21,8 +21,8 @@ package org.apache.hudi.hadoop;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -36,7 +36,7 @@ import java.util.NoSuchElementException;
  */
 public class RecordReaderValueIterator<K, V> implements Iterator<V> {
 
-  private static final Logger LOG = LogManager.getLogger(RecordReaderValueIterator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class);
 
   private final RecordReader<K, V> reader;
   private V nextVal = null;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index cd1cea3..4ed7ba9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -57,8 +57,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
 import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -93,7 +93,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     extends HiveInputFormat<K, V> {
 
   private static final String CLASS_NAME = HoodieCombineHiveInputFormat.class.getName();
-  public static final Logger LOG = LogManager.getLogger(CLASS_NAME);
+  public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   // max number of threads we can use to check non-combinable paths
   private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
@@ -125,7 +125,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         if (inputFormat instanceof AvoidSplitCombination
             && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
+            LOG.debug("The path [{}] is being parked for HiveInputFormat.getSplits", paths[i + start]);
           }
           nonCombinablePathIndices.add(i + start);
         }
@@ -388,7 +388,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       Class inputFormatClass = part.getInputFileFormatClass();
       String inputFormatClassName = inputFormatClass.getName();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
-      LOG.info("Input Format => " + inputFormatClass.getName());
+      LOG.info("Input Format => {}", inputFormatClass.getName());
       // **MOD** Set the hoodie filter in the combine
       if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
         combine.setHoodieFilter(true);
@@ -428,11 +428,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         f = poolMap.get(combinePathInputFormat);
         if (f == null) {
           f = new CombineFilter(filterPath);
-          LOG.info("CombineHiveInputSplit creating pool for " + path + "; using filter path " + filterPath);
+          LOG.info("CombineHiveInputSplit creating pool for {}; using filter path {}", path, filterPath);
           combine.createPool(job, f);
           poolMap.put(combinePathInputFormat, f);
         } else {
-          LOG.info("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath);
+          LOG.info("CombineHiveInputSplit: pool is already created for {}; using filter path {}", path, filterPath);
           f.addPath(filterPath);
         }
       } else {
@@ -481,7 +481,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       result.add(csplit);
     }
 
-    LOG.info("number of splits " + result.size());
+    LOG.info("number of splits {}", result.size());
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
@@ -491,8 +491,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
   @VisibleForTesting
   public Set<Integer> getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads)
       throws ExecutionException, InterruptedException {
-    LOG.info("Total number of paths: " + paths.length + ", launching " + numThreads
-        + " threads to check non-combinable ones.");
+    LOG.info("Total number of paths: {}, launching {} threads to check non-combinable ones.", paths.length, numThreads);
     int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
 
     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -555,8 +554,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     // Store the previous value for the path specification
     String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("The received input paths are: [" + oldPaths + "] against the property "
-          + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+      LOG.debug("The received input paths are: [{}] against the property {}", oldPaths,
+          org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
     }
 
     // Process the normal splits
@@ -589,7 +588,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     // clear work from ThreadLocal after splits generated in case of thread is reused in pool.
     Utilities.clearWorkMapForConf(job);
 
-    LOG.info("Number of all splits " + result.size());
+    LOG.info("Number of all splits {}", result.size());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new InputSplit[result.size()]);
   }
@@ -691,7 +690,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         retLists.add(split);
         long splitgLength = split.getLength();
         if (size + splitgLength >= targetSize) {
-          LOG.info("Sample alias " + entry.getValue() + " using " + (i + 1) + "splits");
+          LOG.info("Sample alias {} using {} splits", entry.getValue(), i + 1);
           if (size + splitgLength > targetSize) {
             ((InputSplitShim) split).shrinkSplit(targetSize - size);
           }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index d7b50d4..6c7bab1 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -84,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader {
   // Default file path prefix for spillable file
   public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
 
-  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
 
   protected final HoodieRealtimeFileSplit split;
   protected final JobConf jobConf;
@@ -98,12 +98,12 @@ public abstract class AbstractRealtimeRecordReader {
   public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
     this.split = split;
     this.jobConf = job;
-    LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
-    LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-    LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
+    LOG.info("cfg ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
+    LOG.info("columnIds ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    LOG.info("partitioningColumns ==> {}", job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
     try {
       this.usesCustomPayload = usesCustomPayload();
-      LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
+      LOG.info("usesCustomPayload ==> {}", this.usesCustomPayload);
       baseFileSchema = readSchema(jobConf, split.getPath());
       init();
     } catch (IOException e) {
@@ -339,10 +339,10 @@ public abstract class AbstractRealtimeRecordReader {
         LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf);
     if (schemaFromLogFile == null) {
       writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
-      LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
+      LOG.debug("Writer Schema From Parquet => {}", writerSchema.getFields());
     } else {
       writerSchema = schemaFromLogFile;
-      LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
+      LOG.debug("Writer Schema From Log => {}", writerSchema.getFields());
     }
     // Add partitioning fields to writer schema for resulting row to contain null values for these fields
     String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
@@ -359,8 +359,8 @@ public abstract class AbstractRealtimeRecordReader {
     // to null out fields not present before
 
     readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
-    LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
-        split.getDeltaFilePaths(), split.getPath(), projectionFields));
+    LOG.info("About to read compacted logs {} for base split {}, projecting cols {}",
+        split.getDeltaFilePaths(), split.getPath(), projectionFields);
   }
 
   private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
@@ -377,7 +377,7 @@ public abstract class AbstractRealtimeRecordReader {
       } else {
         // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
         // They will get skipped as they won't be found in the original schema.
-        LOG.debug("Skipping Hive Column => " + columnName);
+        LOG.debug("Skipping Hive Column => {}", columnName);
       }
     }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index f62f288..0320257 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -46,8 +46,8 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -65,7 +65,7 @@ import java.util.stream.Stream;
 @UseFileSplitsFromInputFormat
 public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetRealtimeInputFormat.class);
 
   // These positions have to be deterministic across all tables
   public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
@@ -148,7 +148,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
         throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
       }
     });
-    LOG.info("Returning a total splits of " + rtSplits.size());
+    LOG.info("Returning a total splits of {}", rtSplits.size());
     return rtSplits.toArray(new InputSplit[rtSplits.size()]);
   }
 
@@ -180,9 +180,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
       conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
       conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
+        LOG.debug("Adding extra column {}, to enable log merging cols ({}) ids ({}) ", fieldName,
             conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
       }
     }
     return conf;
@@ -210,7 +210,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
       conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
+        LOG.debug("The projection Ids: [{}] start with ','. First comma is removed", columnIds);
       }
     }
     return conf;
@@ -226,9 +226,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     // actual heavy lifting of reading the parquet files happen.
     if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
       synchronized (job) {
-        LOG.info(
-            "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
-                + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+        LOG.info("Before adding Hoodie columns, Projections : {}, Ids : {}",
+            job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+                job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
         if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
           // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
           // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
@@ -244,7 +244,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
       }
     }
 
-    LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+    LOG.info("Creating record reader with readCols : {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
     // sanity check
     Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
index cb8606e..bd02971 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -39,7 +39,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
   public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip";
   // By default, we do merged-reading
   public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
-  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeRecordReader.class);
   private final RecordReader<NullWritable, ArrayWritable> reader;
 
   public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index deeaaf4..5ad59af 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
@@ -40,7 +40,7 @@ import java.util.Map;
 class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
     implements RecordReader<NullWritable, ArrayWritable> {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
 
   protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
@@ -108,8 +108,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
         Writable[] replaceValue = aWritable.get();
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
-              arrayWritableToString(aWritable)));
+          LOG.debug("key {}, base values: {}, log values: {}", key, arrayWritableToString(arrayWritable),
+              arrayWritableToString(aWritable));
         }
         Writable[] originalValue = arrayWritable.get();
         try {
@@ -117,8 +117,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
           arrayWritable.set(originalValue);
         } catch (RuntimeException re) {
           LOG.error("Got exception when doing array copy", re);
-          LOG.error("Base record :" + arrayWritableToString(arrayWritable));
-          LOG.error("Log record :" + arrayWritableToString(aWritable));
+          LOG.error("Base record : {}", arrayWritableToString(arrayWritable));
+          LOG.error("Log record : {}", arrayWritableToString(aWritable));
           String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
               + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
           throw new RuntimeException(errMsg, re);